20201223のPythonに関する記事は30件です。

Pythonでtwitterでいいねしたツイートにある画像を収集するプログラムを作ってみた

この記事はAkatsuki Advent Calender 2020の23日目になります

はじめに

運用して2~3年経っているのですが記事にしておらず今回いい機会なのですることにしました。

まず事の経緯として
twitterを覗いていると収集したい画像が回ってくることがあります。
そこでいちいち右クリック->ダウンロード->パス指定は非常に面倒だしそういうツイートが連続で来たら?となるとやってられません。

そこで何かしらトリガーがあって自動でダウンロードしてくれないかなと。
それで閲覧する用のwebページがあれば完璧ではないかと。
そんな訳で収集欲と技術欲半々のなか作ることにしました。

今回は収集のところだけ。
(それpixivとかで良くない?とか思うかもしれないですが
自分の欲しいやつだけ閲覧できるものが欲しいんです・・・)

要件

・RTではなくいいねのツイートのみダウンロードすることに。
いいねだとクリック手順が1つ少ないので速攻次のツイートに移れる
・収集はレンタルサーバーか何かで行う
自宅に常時つけるPCはないので、収集タイミング時につけてないといけないのは利便性が悪い。
・実装当時はstreaming apiが生きていたのでそれでリアルタイムでいいねツイートを取得
-> なくなったのでrest apiで時間をおいて取得する方にしました。
・ツイート内容やURL、@ IDなどもwebページでの検索用として保存しておく

開発環境

レンタルサーバー
・さくらサーバー

言語
・python3.6

使用ライブラリ
・requests_outhlib
・mysql.connector

Twitter Devへの登録

https://developer.twitter.com/
こちらで登録を行っておき、認証を通せるようにしときます。

実装へ

ところどころ省略しています

1. セッションの生成

twitter developerにてappを作っておく。
そこでKey And Tokenのタブに認証に必要なKeyとTokenがあるのでコピーしておきます。

twitter_screenshot.png

requests_outhlibにてセッションを作る際は
以下のコードで先程のKeyとTokenを指定します。

twitter_session = OAuth1Session(consumerKey
        , consumerSecret
        , accessToken
        , accessSecret)

2. APIの結果取得

https://developer.twitter.com/en/docs/twitter-api/v1/tweets/post-and-engage/api-reference/get-favorites-list
今回使ういいねAPIについてはこちら

コードでは以下のように使用します

#APIのResources URLと渡すパラメータを指定
#パラメータは上のページに載っていますが今回は以下の2つをメインで使います
#screen_name:@で表記するid 本来はuser_idを指定するのが確実ですが、収集アカウントは変える予定ないのでこれで
#count:一度に取得するツイート数 (max:200)
request = twitter_session.get("https://api.twitter.com/1.1/favorites/list.json", params = {"screen_name":ID, "count":200})

#古いのツイートを取得したい場合 パラメータに以下を追加します
#tweet_idよりも前のツイートを取得するようになります。指定なしで最新のツイートからになります
"max_id":tweet_id

結果はjsonで返ってきます。
status_codeでエラーかどうか判別。request.textにjsonのデータが有るためjson.loadsでjsonに変換しておきます。

if request.status_code == 200:
    print("rest connect")
else:
    print("rest Error code%d", request.status_code)
return ""
tweets = json.loads(request.text)

またこれを実行する際も以下の制限に気をつける必要があります
75リクエスト/15分
・取得されるツイートの順番はいいねした順番ではなくいいね先のツイート日時が新しい順に取得されます
なので古いツイートを取得する場合は工夫する必要があります

3. jsonデータを元に画像を取得

先程取得したtweets変数にはツイートが配列が入っているためfor文で1つずつ処理が可能です

for tweet in tweets:
    #ツイートから情報を取得

    #ツイートのID 一番最後のtweet["id"]を上で説明したtwitter_session.getのmax_idに指定することで
    #次のリクエストで取得する結果がかぶることなく処理できます
    tweet["id"]
    #表示する名前 
    tweet["user"]["name"]
    #@id
    tweet["user"]["screen_name"]
    #画像一覧 (URL)
    tweet["extended_entities"]["media"]
    #ツイート文
    tweet["text"]
    #ツイートURLは以下の組み合わせで
    "https://twitter.com/"+tweet["user"]["screen_name"]+"/status/"+tweet["id_str"]

今回は画像一覧なので

    #画像一覧 (URL)
    image_list = tweet["extended_entities"]["media"]

これを使います。
これも配列としてやってくるのでfor文で処理します
色々と省略はしていますが基本的にこれでダウンロード後保存を繰り返しています。

for image in image_list:
    url = image["media_url"]
    img = urllib.request.urlopen(url, timeout = 5).read()
    f = open(path, 'wb')
    f.write(img)
    f.close()

流れとしては以下の通り
・セッション作成
・リミットまでgetリクエスト75回ループ
・取得ツイートの画像を取得
・取得ツイートの最後のツイートIDを保持してリクエスト時に渡す

もし古いツイートを取得する場合は定期的に全いいねを見ていくのが良いかと

4.レンタルサーバーでCRON設定

今回はリクエストが15分ごとに制限があるのでそれに合わせてCRONを設定します
スクリーンショット 2020-12-23 23.28.51.png

これでいいね自動収集化完了ですが
web上で閲覧・検索をするのであればDBにツイートの情報保存したりする必要があります。

最後に

この後、DBやwebページは適宜設定して閲覧はできたのですが、
正直ほとんど見てません。収集とプログラムが動いたのでそこができただけで満足しちゃいました。
とはいえ学習するための題材としてはなかなか良かったと思います。スクリプト言語全然触っていなかったのと
自動化への興味が湧いた一番の作品?だったので。

なお、現在収集できた枚数は
image.png
約 1 7 万 枚

いいねの回数は
image.png
1 5 万 い い ね
自分でもこの人イカれてますねと思いました。自分だけど・・・

もっと良いシステムを作るとするならば機械学習で収集する画像も自動化が一番いいのですが、一度やろうとしたところ
収集したい画像の判定がめちゃんこ難しくて諦めました。自分の好みを落とし込まないといけない難易度高い・・・
学習のための画像はあるんですけどね・・・・・

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

オブジェクトストレージ Cloudian/S3 にJSON形式のデータを書き込みしてみます

はじめに

AWS SDK for Python(boto3) を使って、プログラムをほぼ変更せずに Cloudian と S3 へのアクセスが可能となります。オブジェクトストレージをハイブリッド(オンプレミス:Cloudian、AWS:S3)で使いたい方への参考になればと、、、

概要

オブジェクトストレージ Cloudian/S3 上のバケット名「boto3-cloudian」に、JSON形式のデータを書込む Python プログラムです。

生成するJSON形式のデータ件数はパラメータで指定でき、ファイル「test-iot-dummy.json」に書き込まれます。プログラムをカスタマイズすることにより、なんちゃってIoTデータ生成として使えるかもと想定しております。生成されるデータ項目についてはプログラム内の「items」を参照ください。

  • パラメータは以下の3種類となります。
    • --count : 生成するデータ件数(デフォルト:10件)
    • --proc : 生成するプロセス名(デフォルト:111)
    • --mode : 生成データの出力先の指定 tm:ターミナルへの出力、 s3:Cloudian/S3への出力(デフォルト:tm)

プログラム実行時に、パラメータ「-h」を指定することにより表示されるヘルプも参照ください。

実行環境

macOS Big Sur 11.1
python 3.8.3

クレデンシャル情報の定義

今回はクレデンシャル情報を .zshenv に定義してプログラムを実行しています。接続先に合わせて定義ください。

# AWS S3
export AWS_ACCESS_KEY_ID=xxxxxxxxxxxxx
export AWS_SECRET_ACCESS_KEY=yyyyyyyyyyyyyyyyy
export AWS_DEFAULT_REGION=ap-northeast-1

# Cloudian
#export AWS_ACCESS_KEY_ID=aaaaaaaaaaaaaaaaaa
#export AWS_SECRET_ACCESS_KEY=bbbbbbbbbbbbbbbbbbbb
#export AWS_DEFAULT_REGION=pic

実行プログラム

Cloudianへアクセスする場合は endpoint_url を記載ください(プログラム内を参照ください)。

IoTSample-write.py
import random
import json
import time
from datetime import date, datetime
from collections import OrderedDict
import argparse
import string
import boto3
import pprint

from faker.factory import Factory

BUCKET_NAME = 'boto3-cloudian'
OBJECT_KEY = 'test-iot-dummy.json'


# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# IoT機器のダミーセクション(小文字アルファベットを定義)
section = string.ascii_uppercase


# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': i,                            # id
            'time': generate_time(),            # データ生成時間
            'proc': proc,                       # データ生成プロセス名
            'section': random.choice(section),  # IoT機器セクション
            'iot_num': fake.zipcode(),          # IoT機器番号
            'iot_state': fake.prefecture(),     # IoT設置場所
            'vol_1': random.uniform(100, 200),  # IoT値−1
            'vol_2': random.uniform(50, 90)     # IoT値−2
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime

# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# メイン(ターミナル出力用)
def tm_main(count, proc):
    print('ターミナル 出力')
    iotjsondata = iot_json_data(count, proc)
    pprint.pprint(iotjsondata)


# メイン(Cloudian/S3 出力用)
def s3_main(count, proc):
    print('Cloudian/S3 出力')
    iotjsondata = iot_json_data(count, proc)
    # pprint.pprint(iotjsondata)

    # client = boto3.client('s3', endpoint_url='http://s3-pic.networld.local')    # Cloudianへのアクセス時
    client = boto3.client('s3')                                                 # S3へのアクセス時
    client.put_object(
        Bucket=BUCKET_NAME,
        Key=OBJECT_KEY,
        Body=iotjsondata
    )


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
    parser.add_argument('--count', type=int, default=10, help='データ作成件数')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
    parser.add_argument('--mode', type=str, default='tm', help='tm(生成データをターミナル出力)/ s3(生成データをCloudian/S3出力)')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 's3'): 
        s3_main(args.count, args.proc)
    else :
        tm_main(args.count, args.proc)

    making_time = time.time() - start

    print("")
    print(f"データ作成件数:{args.count}")
    print("データ作成時間(通常_API):{0}".format(making_time) + " [sec]")
    print("")

プログラムの実行

最初にヘルプを表示してみます。

$ python IoTSample-write.py -h            
usage: IoTSample-write.py [-h] [--count COUNT] [--proc PROC] [--mode MODE]

IoT機器のなんちゃってダミーデータの生成

optional arguments:
  -h, --help     show this help message and exit
  --count COUNT  データ作成件数
  --proc PROC    データ作成プロセス名
  --mode MODE    tm(生成データをターミナル出力)/ s3(生成データをCloudian/S3出力)

次に、10万件のデータを生成し、それをターミナル出力してみます

$ python IoTSample-write.py --count 100000
     :
    出力内容は割愛
     :
データ作成件数:100000
データ作成時間(通常_API):4.5370988845825195 [sec]

今度は実際に Cloudian/S3 に10万件のデータを生成してみます。

$ python IoTSample-write.py --count 100000 --mode s3

データ作成件数:100000
データ作成時間(通常_API):2.7221038341522217 [sec]

まとめ

今回は、AWS SDK for Python(boto3) を使って、オブジェクトストレージ Cloudian / S3 へデータを生成することを確認できました(10万件のデータを数秒で作成完了(もちろん環境に依存します))。

Cloudianについては、ここ を確認ください。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[Python] 区間スケジューリング ABC103D

ABC103D

下図のようにすべて串刺しにするには、最小何本必要か、という問題である。
image.png
区間スケジューリング問題とは以下のような問題である:

M 個の区間が与えられ、どの 2 つの区間も時間帯を共有しないように最大個数の区間を選べ

蟻本の Greedy の章の最初にも載っている有名問題で、区間の終端でソートして Greedy にとっていけばよい。

実は今回の問題の答えは、区間スケジューリング問題の最適解と同じになる:

  • まず区間スケジューリング問題の答えが k 個だった場合、その k 個は時間帯を共有しないので、それらを全部刺すには最低でも k 本の串が必要である (弱双対性)

逆に k 本の串があれば十分であることは、区間スケジューリング問題に対する貪欲法の動きを注意深く追うと理解することができる。具体的には、

  • 区間スケジューリング問題で選ぶ k 個の区間に対して、その右端から串を刺していけば、ちょうど k 本の串ですべての区間を串刺しにできる (強双対性)
サンプルコード
from operator import itemgetter

n, m = map(int, input().split())

# 区間の終端でソート
ab = sorted([tuple(map(int, input().split())) for i in range(m)], key=itemgetter(1))
# 前回除いた橋
removed = -1
ans = 0

for a, b in ab:
    # a が removed より大きい = まだ取り除いてない
    if a > removed:
        removed = b - 1
        ans += 1

print(ans)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

statsmodelsでホールドアウト検証を実装する

はじめに

pythonでは,ロジスティック回帰モデルを利用できるライブラリとして主にsklearnとstatsmodelsが用いられます.statsmodelsには係数の有意差検定を自動でしてくれる等のsklearnにはない利点がある一方で,代表的なモデルの評価方法であるホールドアウト法や交差検証法に対応していません.そこで今回は,statsmodelsでホールドアウト法を実装するためのコードを作成してみます.

ライブラリのインストール

sample.ipynb
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
import statsmodels.api as sm

データのインストールと前処理

データには,私が卒業研究のために独自に集めたクラウドファンディングに関するデータを用います.
このデータは私のgithubページに置いているので必要なら自分の環境にダウンロードしてください.

sample.ipynb
#csvファイルの読み込み
cultured = pd.read_csv("cultured.csvへのpath")

#目的変数を作成 0:クラウドファンディング失敗 1:クラウドファンディング成功
cultured["achievement"] = cultured["支援総額"] // cultured["目標金額"]
cultured["target"] = 0
cultured.loc[cultured['achievement']>=1,'target'] = 1

#目的変数(y)と説明変数(x)に分ける
#add_constantで定数項を作成
y = cultured["target"]
x_pre = cultured[["目標金額","支援者数","文字数","活動報告回数"]]
x = sm.add_constant(x_pre)

このデータは,説明変数である目標金額,支援者数,文字数,活動報告回数からクラウドファンディングプロジェクトが成功(y=1)か失敗(y=0)かを予測するためのものです.sklearnのロジスティック回帰では,定数項を勝手に生成してくれますが,statsmodelsにはその機能はないのでadd_constant()を用いて生成しています.
説明変数は(x)はこんな感じです.
スクリーンショット 2020-12-23 22.38.07.png

実装(この記事のメイン)

sample.ipynb
#ホールドアウト法
def hold_out(x,y):
    #データを訓練データとテストデータに分割
    #test_sizeはデータ全体に対するテストデータの割合
    X_train, X_test, y_train, y_test = train_test_split(x,y,test_size=0.2,random_state=42)
    #訓練データを用いて学習
    model = sm.Logit(y_train, X_train)
    results = model.fit()
    #テストデータに対する予測をpredに格納
    #ただし出力値は目的変数が1になる確率(今回だと成功確率)なので注意
    pred = results.predict(X_test)
    #確率が0.5より大きいものを1に,それ以外を0に変換
    #リスト内表記を利用
    result = [1 if i>0.5 else 0 for i in pred]
    #train_test_splitでインデックスの順番がめちゃくちゃなので振り直す
    y_test_re = y_test.reset_index(drop=True)
    #カウントに初期値を格納
    count=0
    #y_testと予測値の値が一致するならばcountに1を足す
    for i in range(len(y_test)):
        if y_test_re[i] == result[i]:
            count+=1
    #戻り値は予測の精度
    return count/len(y_test)

結果

sample.ipynb
hold_out(x,y)

関数を実行すると...
私の環境だと0.878でした!

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Unix Domain SocketによるuWSGIとNginxの通信

本記事は東京学芸大学 櫨山研究室 Advent Calendar 2020の16日目の記事になります.

はじめに

本記事はPythonのWebフレームワークであるflaskで作成したアプリケーションをuWSGIサーバを使って動作させる内容を通してUnix Domain Socket通信について学ぶという内容になります.

Unix Domain Socketとは

Unix Domain SocketはPOSIX系のOSに搭載されている機能でTCP/UDPによるソケット通信とは異なり,カーネル内部で完結する高速なネットワークインタフェースを作成します.
カーネル内部で完結するため外部のコンピュータとは接続することはできません.

flaskのアプリケーションの作成

エンドポイント/でアクセスしてHello, Worldとだけ返す簡単なアプリケーションを作成します.

app.py
from flask import Flask
app = Flask(__name__)

@app.route("/")
def index():
    return "Hello World"

if __name__ == "__main__":
    app.run(host="0.0.0.0", debug=True)

そして以下のコマンドを実行することでアプリケーションが起動します.
この時,起動には開発用の組み込みサーバが立ち上がります.

python app.py

http://localhost:5000/
にアクセスしてHello, Worldと表示されれば成功です.

uWSGIを使って動かす

先ほどの例では開発用の組み込みサーバを使用していました.
PythonにはアプリケーションサーバとしてuWSGIがあります.
uWSGIではUnix Domain Socketでの通信がサポートされています.

まずはuWSGIを使って先ほどのアプリケーションを動作させてみます.

pip install uWSGI
app.py
from flask import Flask
app = Flask(__name__)

@app.route("/")
def index():
    return "Hello World"

if __name__ == "__main__":
    app.run()

uWSGIで先ほどのアプリケーションを起動するには以下のコマンドを入力します.

uwsgi --http=0.0.0.0:4000 --wsgi-file=app.py --callable=app

http://localhost:4000/
にアクセスしてHello, Worldと表示されれば成功です.

uwsgi.iniという設定ファイルを作成することでより短いコマンドで起動することも可能です.

uwsgi.ini
[uwsgi]
wsgi-file=app.py
callable=app

http=0.0.0.0:4000
uwsgi uwsgi.ini

Nginxの導入

さてここからが本題のUnix Domina Socketでのアクセスです.これまでの起動方法ではHTTP通信(TCPソケット)でのアクセスでした.

実際に運用する場合は同一サーバで複数のアプリケーションを動作させることを考慮しNginxをリバースプロキシとする場合が多いでしょう.
この時,同一サーバ内の通信であればUnix Domain Socketを使ってHTTP通信よりも高速にアクセスすることができます.

今回は便宜的にDockerを用いて動作させてみます.

uwsgi.iniの編集

uwsgi.iniを編集してUnix Domain Socketで通信するようにします.
Unix Domain Socketではファイルシステム上の指定した位置にソケットファイルができます.
クライアントはソケットファイルにアクセスを試みます.
つまりIPアドレスとポート番号ではなくファイルパスによって通信相手を決めます.

今回は/var/app/app.sockというソケットファイルを作成するようにします.

uwsgi.ini
[uwsgi]
wsgi-file=app.py
callable=app

http=0.0.0.0:4000
# 追記
socket=/var/app/app.sock

Docker環境の準備

まずは先ほどのflaskのアプリケーションを動作させる環境を用意するDockerfileを用意します.

Dockerfile
FROM python:3.7

ENV LANG C.UTF-8
ENV TZ Asia/Tokyo

WORKDIR /app
ADD app.py ./
ADD uwsgi.ini ./

RUN pip install flask uWSGI

CMD ["uwsgi", "uwsgi.ini"]

nginxも動かしたいのでdocker-compose.ymlを用意します.

docker-compose.yml
version: '3.7'

services:

  app:
    container_name: uds_app
    build:
      context: .
      dockerfile: ./Dockerfile
    volumes:
      - ./tmp/:/var/app/
    tty: true

  nginx:
    container_name: uds_nginx
    image: nginx
    volumes:
      - ./tmp:/var/app/
      - ./nginx.conf:/etc/nginx/conf.d/default.conf
    ports:
      - 80:80
    tty: true

ここで重要なのはホストマシン上の./tmpディレクトリをflaskアプリケーションコンテナとNginxコンテナの両方と共有していることです.

  1. ホストマシン上の./tmpディレクトリ ←→ flaskアプリケーションコンテナの/var/appディレクトリ
  2. ホストマシン上の./tmpディレクトリ ←→ Nginxコンテナの/var/appディレクトリ

これによりNginxコンテナからもソケットファイルを参照できます.

Nginxの設定ファイルを記述し/へのアクセスをflaskのアプリケーションを呼び出すようにします.

nginx.conf
server {
    listen       80 default_server;
    listen       [::]:80 default_server;
    server_name  _;

    location / {
    include uwsgi_params;
    uwsgi_pass unix:/var/app/app.sock;
    }

    error_page 404 /404.html;
    location = /404.html {
    }

    error_page 500 502 503 504 /50x.html;
    location = /50x.html {
    }
}

docker-compose upコマンドで起動してみましょう.

docker-compose up

http://localhost/
にアクセスして先ほどと同様にHello, Worldの文字列が確認できれば成功です.

おわりに

本記事ではflaskで作成したアプリケーションにリバースプロキシのNginxからUnix Domain Socketで通信するということを扱いました.

TCP通信と異なりファイルパスによって通信相手を決めるという部分が伝わればと思います.
また余裕があれば性能も調査してきたいですね.

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Google Colab で Dash する

python のデータ可視化のフレームワークに Dash(https://dash.plotly.com/) があります。

これが Google Colab でも利用可能に、、とすでに解説されている良記事を見つけてしまいました。
https://qiita.com/OgawaHideyuki/items/725f4ffd93ffb0d30b6c

ということで、この記事は参考にさせていただいて自分の手を動かしました、という記録です。
お題は安直にコロナ感染者数の可視化です。

使用する環境など

Google Colaboratory を使用します。
その他は下記です。

  • Dash(https://dash.plotly.com/) グラフライブラリである plotly を使用して、インンタラクティブなデータ可視化画面を簡単に作れる python フレームワークです。
  • Johns Hopkins 大学の国・地域別感染者データを使わせていただきます。 https://github.com/CSSEGISandData/COVID-19

今回のノートブックは下記に置いています。
https://colab.research.google.com/drive/1fUP4818fSsFFFlUHlLGNoTxq8uoL2VAu?usp=sharing

実行準備とデータ

まず Google Colab/Jupyter ノートブックから Dash を使用するためのパッケージをインストールします。

! pip install jupyter_dash
! pip install --upgrade plotly

Dash と関連するパッケージを import します。

import dash 
from jupyter_dash import JupyterDash 
import dash_core_components as dcc 
import dash_html_components as html 
import plotly.express as px
from dash.dependencies import Input, Output

コロナの感染者データを GitHub から取得します。
データに関しては下記のページを参考にさせていただいています。
https://dodotechno.com/covd-19-visualization/

! wget https://github.com/CSSEGISandData/COVID-19/raw/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv

データフレームにダウンロードした csv を読み込みます。

import pandas as pd
df = pd.read_csv("time_series_covid19_confirmed_global.csv")

region ごとの集計、緯度経度の削除、国ごとの列に転換し、date 列の追加を行います。

df = df.groupby(['Country/Region'], as_index=False).sum()
df.drop(["Lat","Long"], axis=1,inplace=True)
df = df.T
df.columns =  df.iloc[0]    
df = df[1:]
df.reset_index(inplace=True)
df.rename(columns={'index': 'date'},inplace=True)
df

結果、得られる表は以下のようになります。

image.png

画面に表示してみる

まずは日本の感染者数を表示してみます。
第一波(4月下旬)、第二波(8月上旬)、第三波(11月)らしき増加が見えます。

px.line(df, x="date", y="Japan")

image.png

つづいて任意の国を選択可能にしてみます。
ノートブック上で選択できますので、試してみてください。
https://colab.research.google.com/drive/1fUP4818fSsFFFlUHlLGNoTxq8uoL2VAu#scrollTo=Kr-FsvLIpCoN&line=1&uniqifier=1

app = JupyterDash(__name__)

app.layout = html.Div([
  dcc.Dropdown(id="my_dropdown",
    options=[{"value": country, "label": country} for country in df.columns.unique()],
    value=["Japan"],
    multi=True
    ),
  dcc.Graph(id="my_graph")
])

@app.callback(Output("my_graph", "figure"), Input("my_dropdown", "value"))
def update_graph(selected_country):
  return px.line(df, x="date", y=selected_country)

app.run_server(mode="inline")

ドロップダウンから、日本とカナダを選択して表示します。カナダも増加傾向のようですね。

image.png

アメリカを追加して見ると、本当に日本の比ではないですね。。やはりグラフで見るとインパクトがあります。
ワクチンが有効打になって欲しいです(人ごとではありませんが)。

image.png

おわりに

ちょっとした可視化なら簡単に実装でき、Google Colab でインターネット公開も容易です。
csv の加工が一番手間かも。
地図上の可視化なども追加してみたいと思います。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

DjangoでCORSカスタムミドルウェアをつくった

この記事は、 岩手大学 Advent Calendar 2020 の24日目の記事です。

はじめに

個人開発でDjangoを使ったAPIをつくるときがありました。
そのとき、APIの実装でCORS問題にぶち当たり、CORSの対応をする必要がありました。

Djangoには django-cors-headers というものがあります。しかし、django-cors-headersを使って実装をしてみましたが、どうもAccess-Control-Allow-Originがうまく指定できず、他のサイトからでもリクエストを送れるようになってしまっていました。
そこで、Djangoのミドルウェアを作成しCORS対応をしましたのでご紹介します。

Django初心者なので、間違いなどがありましたらご指摘していただけると勉強になります。

実装

実装は以下のようになりました。

custom_middlewares/custom_cors_middleware.py
from django.http import HttpResponse
from django.utils.deprecation import MiddlewareMixin


class CustomCorsMiddleware(MiddlewareMixin):
    def process_request(self, request):
        if request.method == 'OPTIONS':
            response = HttpResponse()
            response['Access-Control-Allow-Origin'] = 'http://localhost:3000' # クライアントのオリジン
            response['Access-Control-Allow-Headers'] = ', '.join([ # 許可するHeaderを追加
                'Content-Type',
            ])
            response['Access-Control-Allow-Methods'] = ', '.join([ # 許可するリクエストメソッドを追加
                'DELETE',
                'GET',
                'OPTIONS',
                'PATCH',
                'POST',
                'PUT',
            ])
            return response
        else:
            return None

    def process_response(self, request, response):
        response['Access-Control-Allow-Origin'] = 'http://localhost:3000' # レスポンスを読み取ることができるオリジン
        response['Content-Type'] = 'application/json' # レスポンスタイプ
        return response

process_requestメソッドは、リクエストが来たときに実行され、Noneが返り値の場合はルーティングされているviewが実行されます。

リクエスト
↓
process_request が実行 → 返り値がある場合はその返り値を返す
↓
返り値が None の場合は、リクエストのルーティングに対応する`view`が実行される

上記のコードでは、プリフライトリクエストに備えて、OPTIONSメソッドの場合は、許可するオリジン・ヘッダー・リクエストメソッドをヘッダーに付与したレスポンスを返すようにしています。

次に、process_responseメソッドは、レスポンスをするときの最後に実行されます。

view を実行
↓
process_request が view から返されたレスポンスを受け取り、レスポンスに処理を行う
↓
レスポンス

上記のコードでは、CORSでよく見る

Access to XMLHttpRequest at 'http://locahost:8000' from origin 'http://localhost:3000' has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.

のためにレスポンスを読み取ることができるオリジンを指定し、またレスポンスタイプにapplication/jsonを指定しています。

あとは、このカスタムミドルウェアをsettings.pyに追加してあげれば完了です。

your_application_name/settings.py
MIDDLEWARE = [
    'custom_middlewares.custom_cors_middleware.CustomCorsMiddleware', # 追加
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

おわりに

Djangoを触ってみて思ったのは、日本語のドキュメントが少ないことです。
英語のドキュメントでも日本語のように読めると苦労が減るのかもしれませんね。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

木構造の復習ついでに BFS に挑戦

こんばんは(*´ω`)

最近、leatcode の問題が少しづつ解けるようになってきました。
皆様のおかげです、有難うございます。 m(_ _)m

あくまで私見ですが、問題の傾向としてアルゴリズムの基本を
理解しているかを問われているような印象を持ちました。
まだ easy, チョット medium のレベルで恐縮ですが。。

新しい問題にチャレンジする際に行き詰ったら
基本に戻って理解を底上げすることが前進するキッカケになると思います。
行き詰った方が居たら、ぜひ基本に戻ってみることをお勧めいたします。

微力ながら次の leetcode の問題にチャレンジしたいので
基本の復習 + α(deque の勉強) として本誌を書くことにしました。

はい、本題です。
まずは何も見ないで
どんなツリーを作りたいか想像して書いてみました。

myTree.py
### TreeImage ###
#       0       #
#      / \      #
#     1   2     #
#    /     \    #
#   3       4   #
#  /         \  #
# 5           6 #
#################
class tree_node:
    def __init__(self,val,left=None,right=None):
        self.val = val
        self.left = left
        self.right = right

class tree:
    def __init__(self):
        self.root = None

    def add_left(self,val):

        if not self.root:
            self.root = tree_node(val)
        else:
            if not self.root.left:
                self.root.left = tree_node(val)
            else:
                runner = self.root
                while runner.left:
                    runner = runner.left
                runner.left = tree_node(val)

    def add_right(self,val):

        if not self.root:
            self.root = tree_node(val)
        else:
            if not self.root.right:
                self.root.right = tree_node(val)
            else:
                runner = self.root
                while runner.right:
                    runner = runner.right
                runner.right = tree_node(val)


T = tree()
T.add_left(0)

T.add_left(1)
T.add_left(3)
T.add_left(5)

T.add_right(2)
T.add_right(4)
T.add_right(6)          

イメージ通りか確認してみましょう。
確認方法ですが、deque を使います。
今日触ったばかりなのでイメージが怪しいですが、
とりあえず TreeNode を丸々バッファすることができます。
この特徴を利用して BFS で読み込むにはどうしたら良いのか検討してみました。
こんな記述はどうでしょうか。

myTree.py
    def show(self):
        head = self.root
        q = deque([head])
        ans = []
        while q:
            nums = q.popleft()
            if not nums:
                continue
            ans.append(nums.val)
            if nums.left or nums.right:
                q.append(nums.left)
                q.append(nums.right)
        print(ans)

今回作成したツリーはこちらでした。

tree_image.py
### TreeImage ###
#       0       #
#      / \      #
#     1   2     #
#    /     \    #
#   3       4   #
#  /         \  #
# 5           6 #
#################

一応、イメージとしては、以下にあるように、
上から矢印の方向に、同一層の値を順番に読みだしてリストに順次格納していくつもりです(笑)

tree_image.py
### TreeImage ###
# →      0       #
#       / \      #
# →    1   2     #
#     /     \    #
# →  3       4   #
#   /         \  #
# →5           6 #
#################

そのため、配列に直して表示すると、[0,1,2,3,4,5,6] ってなるはずです。
それでは実行してみましょう。

実行結果.py
[0, 1, 2, 3, 4, 5, 6]

うん、大丈夫そうです。
因みに、ジグザグっぽく読んでも一応 BFS ですよね?
イメージはこんな感じです。

tree_image.py
### TreeImage ###
# →      0         #
#       / \        #
#      1   2     ← #
#     /     \      #
# →  3       4     #
#   /         \    #
#  5           6 ← #
#################

ではでは行ってみましょう。

myTree.py
    def show(self):
        head = self.root
        q = deque([head])
        ans = []
        i = 0
        while q:
            if i%2 == 0:
                nums = q.popleft()
            else:
                nums = q.pop()

            if not nums:
                continue
            ans.append(nums.val)
            if nums.left or nums.right:
                q.append(nums.left)
                q.append(nums.right)
            i += 1

        print(ans) 
実行結果.py
### TreeImage ###
# →      0         #
#       / \        #
#      1   2     ← #
#     /     \      #
# →  3       4     #
#   /         \    #
#  5           6 ← #
#################
[0, 2, 1, 3, 4, 6, 5]

OK ですね、勉強になって良かった良かった( *´艸`)
木構造は冒頭に書いたやり方以外にもリストで作るやり方もあるようです。
やっておいた方がいいなー( ´ー`)y-~~
多分、触ってみた感想を本誌に追記しておくと思います。
ではでは( `ー´)ノ

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

機械学習エンジニア弁護士が解説するAIと権利の話

みなさん、こんにちは。
こちらは「ABEJAアドベントカレンダー2020」の23日目の記事です。

はじめに

私は、弁護士として法律事務所に所属しつつ、ABEJAで法務のサポートなどをしています。

また、JDLAのE資格を取得したので、機械学習エンジニアを名乗って少しだけエンジニア的な活動をしたりもしています。

仕事柄、AIと法律の話をすることが多いのですが、その際に私がよく質問されるのが、

① 所有権とか著作権とか特許権とか、権利がいろいろあってAIとの関係で何が問題になるかよくわからない
② データの利用が法的にOK/NGってどういう観点から検討してるのかよくわからない
③ AI開発の委託・受託のときのモデルの権利問題がよくわからない

といったものです。

そこで今日は、この3つの問題をできるだけわかりやすく、一気に解説してしまおう!という記事を書きました。
経産省の「AI・データの利用に関する契約ガイドライン(AI編)」にも少し触れながら解説します(この記事では「AI契約ガイドライン」といいます)。

長文なので、気になるところだけでも読んで参考にしていただければ幸いです!

(ちなみに、ABEJA法務にはRDチームの立ち上げ経験があり自分で機械学習モデルの開発もやっちゃう弁護士もおりまして、その弁護士が書いた「AIと公平性」の記事も絶賛公開中です!)

ということで、早速本題に入っていきましょう!

よくある質問①:所有権とか著作権とか特許権とか、権利がいろいろあってAIとの関係で何が問題になるかよくわからない

AIの開発段階・利用段階は以下のように図示することができます(AI契約ガイドライン12頁より引用)。

image.png

また、AIの開発過程で様々なノウハウが共有されますし、新たに生まれてきます。

そうすると、開発の際の契約で問題になりそうなものとしては、生データ・学習用データセット・学習用プログラム・推論プログラム・学習済みパラメータ・ノウハウといったものが挙げられそうです。

権利と聞いてパッと思いつくのは、所有権・著作権・特許権あたりでしょうか(「知的財産権」が思いついたという方もいらっしゃるかもしれませんが、知的財産権は著作権・特許権・商標権…といった権利の総称として使われる言葉なので、今回は使いません)。

そうすると、次の表の空欄を埋めることができれば「何の権利が問題になるか」という疑問には答えられそうです。
空欄の中身がわかりますか?

image.png

それでは、空欄を埋めるべく、所有権・著作権・特許権を簡単に解説したいと思います。

(1) 所有権

答えから言うと、上記の表では所有権はいずれも発生しません!
なぜなら、所有権は「有体物」に生じる、とされているからです(民法206条→85条を読んでいただけるとわかります)。データやプログラムといったものは有体物ではないので、これらには所有権は生じない、ということです。データが記録媒体に保存されている場合、記録媒体を所有していても、中身のデータを所有している、とは言えません。
ということで、所有権の部分の答えは以下のようになります。

image.png

(2) 著作権

所有権とは異なり、著作権は、基本的に無体物に発生します。著作権は、「思想又は感情を創作的に表現したもの」に発生しえます。

著作権は、絵画や音楽などに加え、創作性のある文章や写真、プログラムにも発生しえます。

したがって、例えば学習につかう生データが、誰かが書いた文章や誰かが撮影した写真などであった場合、当該データには著作権が発生している可能性があります(他方、機械的に取得した数値データや機械的に撮影した写真などには、基本的に著作権は発生しません。また、パラメータにも発生しません)。

学習用プログラムや推論プログラムのソースコードにも、著作権が発生しえます。

ということで、著作権の部分の答えは以下のようになります。

image.png

(学習用データセットについては、△としていますが、「データベースの著作物」という解説しだすとややこしい問題があるので、ここでは省略します。)

ここで、「著作権が発生する」ということの意味を少しだけ解説します。
著作権は、よく「権利の束である」と言われます。どういうことかというと、著作権を持つと、著作物をコピーしたり(複製権)、Qiitaで公開したり(公衆送信権)、編集したり(翻案権)、といったことができるようになりますし、他人が無断でそういった行為をしていたらやめるように求めることができます。
著作権は、こういったいろいろな権利を束にしたものなのです。

image.png

所有権が、所有する「物」を支配する権利であるのに対し、著作権が無体物である創作に対する権利であることが何となくイメージできましたでしょうか?
例えば、絵画を例に挙げると、物体としての「絵」には所有権が生じ、無体物としての「絵」(これは見た感じでは物体としての絵と一体となっています)があります。物体としての絵が盗まれた場合には、所有権に基づいて返還請求をしますが、似たような絵を描いて販売しているのをやめさせたい場合には、基本的に所有権では対応できません(物体としての絵を奪われたわけではないため)。この場合には著作権を根拠に、似たような絵の作成をやめるよう請求することになります。

(3) 特許権

特許権は、発明をした場合に、当該発明を出願し、登録されることで効力を生じる権利です。

著作権は著作物ができれば同時に発生しますが、特許権は出願し、登録されないと発生しないという点が一つの特徴です。

では、特許権はどういったものに生じるでしょうか。
特許権は発明である必要があるので、生データといった単純なデータには生じないのが通常です。
学習用プログラムについては、純粋にAIのモデルのみの開発を行う(周辺のシステムなどは含まない)場合を考えると、アルゴリズムについての発明に特許権が成立する可能性があります(有名な発明としては、Googleのバッチノーマライゼーションがありますね。バッチノーマライゼーションは米国等ではすでに特許として登録されており、日本でも特許出願されています。こういった、GAFA等による重要特許の取得についても語りたいことはたくさんあるのですが、ここでは書ききれないのでまたの機会にします)。

ということで、特許権の部分の答えも埋まりましたね!

image.png

(4) 権利が発生しないものもある?

と、ここで気になる点が出てきたのではないでしょうか?
それは、全てが×(要は所有権・著作権・特許権のいずれも発生しえない)のものがあるということです。例えば、創作性のない生データや学習済みパラメータがそうですね。学習済みパラメータは、重要な情報ではありますが、創作性のない単純な行列の値なので著作権も特許権も基本的に生じないと考えられています。
こういった、「何も権利が生じていない情報」は何か保護がされるのでしょうか?
答えは「保護はされない」となります(ここでは割愛しますが、「営業秘密」「限定提供データ」などの例外はあります)。

つまり、権利が発生しない情報をうっかり第三者に渡してしまった場合、第三者が情報をどう扱おうと、基本的になにも文句は言えないということになります。

それは困る!ということで出てくるのがNDAなどの秘密保持条項です。秘密保持条項では、通常、
・情報を●●という目的以外には使ってはならない(目的の限定)
・第三者に開示してはならない(対象者の限定)
・複製してはならない(利用態様の限定)
といった制約が置かれていますが、これは、権利が発生しない情報を守る、という効果もあるのです(情報授受の前にいちいちNDAを締結するのは面倒ですが、実はすごく重要だということがお分かりいただけるかと思います)。

(5) まとめ

ということで、いったんここまでのポイントをまとめます。

まず、この図は重要ですね。

image.png

その他、以下の点が重要です。

  • 所有権は有体物にしか生じないため、データやプログラムには生じない。
  • 著作権・特許権は無体物に生じる。
  • 著作権は創作と同時に発生するが、特許権は登録しないと発生しない。
  • 著作権は権利の束。著作権を持っていると、複製、公表、編集など様々なことができる(著作権を持っていないとできない)。
  • 所有権・著作権・特許権などの権利が発生しない情報もある。そのような情報を保護するには、開示する際に契約を締結して制限をかける必要がある。

よくある質問②:データの収集・利用が法的にOK/NGってどういう観点から検討してるのかよくわからない

(1) データの収集方法ごとの検討

機械学習では、まずデータを収集する必要があります。
データの確保には様々な方法がありますが、①自社で用意する、②特定の企業からデータを買う、③公開されているデータセットを使う、④不特定多数から収集する、といった方法が考えられますので、順にみていきましょう。

① 自社で用意する場合

①自社で用意する場合の例はたくさんありますが、例えば、不良品検知のAIを作るために、自社製品の写真を大量に準備する、といったことが考えられます。この場合、準備した画像は(おそらく機械的に撮影するでしょうから)通常は創作性がなく、著作権は発生しなそうですね(生データに著作権が発生するかどうかは、データの創作性の有無によって異なってくることを先ほど説明しました。)。利用にあたっても、特に法的な障害はなさそうです。

② 特定の企業からデータを買ってくる場合

②特定の企業からデータを買ってくる場合には、売主の企業が権利を持っていることを確認して、権利ごと譲ってもらえば大丈夫そうです。また、権利は売主に残しつつ、学習に使うことを許可してもらう形でも良さそうです。
要は、締結する契約の内容次第ということですね。
実は、データの取引についても経産省からガイドラインが出ています(「AI・データの利用に関する契約ガイドライン(データ編)」)。興味がある人は見てみてください。

③ 公開されているデータセットを使う場合

③公開されているデータセットを使う場合には、利用条件が設定されている場合が多いです。
例えば、有名な画像のデータセットであるCOCOでは、Creative Commons Attribution 4.0 LicenseのBYという要素が採用されており、原作者のクレジット(氏名、作品タイトルなど)を表示することが要求されています。このようなデータセットの利用条件は、データをダウンロードした時点で利用条件に同意した(=契約した)と解されるため、データ提供者と受領者との間での契約という形で当事者を規律します。契約上の義務として利用条件がかかってくるという構造はNDAとも同じですね(②特定の企業からデータを買ってくる場合も同様です)。
要は、設定している利用条件を検討する必要があるということになります。

④ 不特定多数から収集する場合

④不特定多数から収集する場合には、どういった点が問題になるでしょうか?
例えば、クローリング・スクレイピングにより風景画像を収集して学習に使いたいと思った場合を考えてみましょう。

収集した風景画像はいろんな人が撮影したものなので、著作権が発生しています。そして、学習に使おうとすると、画像をコピーして、リサイズ等の前処理やアノテーションをする必要がありますが、これは複製権や翻案権を侵害しそうです。また、著作権者に許諾を得ようにも誰が著作権者かがわかりません。

でも、実はこのような行為は著作権を侵害せずに行い得ます。
日本の著作権法が「機械学習パラダイス」と呼ばれていたりするのをご存じの方もいらっしゃるかもしれませんが、著作権法30条の4という条文で、機械学習のような情報解析の目的では、一定の場合に著作物を利用できるとされているからです。
この著作権法30条の4は、2019年1月1日施行の改正著作権法で改正されたもので、(改正前から機械学習パラダイスといわれていたのですが)より機械学習パラダイスな条文になっています。

ちなみにABEJAでは、2019年2月にRPAテクノロジーズ様と提携し、改正法で初めて可能になったワンストップのデータセットクローリングサービスを展開しています。サービス開始時のプレスリリースでは、プレスリリースとしては珍しく、かなり詳細に改正法の解説をしました。
著作権法30条の4の内容が気になる方は是非プレスリリースを読んでみてください。

不特定多数人からデータを収集するケースとして、もう一つ、街頭にカメラを設置して人の顔画像を収集する場合を考えてみましょう。カメラから機械的に取得される画像は基本的に著作物ではありませんが、この場合は、個人情報保護法やプライバシー・倫理という別の観点の制約に留意する必要があります。

このように、データの収集や利活用を行う場合には、個人情報保護法などの法令や、プライバシー・倫理といった観点からの検討が必要な場合があります。

(2) まとめ(データの収集・利活用の際のチェックポイント)

  • 契約によってデータ利活用が制限されないか
  • 個人情報保護法などの法令等によりデータ利活用が制限されないか

※ データの中に第三者の著作物が含まれていても、適法に利活用しうる

よくある質問③:AI開発の委託・受託のときのモデルの権利問題がよくわからない

(1) 学習用プログラム・推論プログラムと著作権

モデル開発の委託/受託では、学習用プログラム、推論プログラム、学習済みパラメータといったものについて、開発フェーズで納品されることが多いかと思います。
下図のとおり、学習用プログラム・推論プログラムには著作権が発生しますが、学習済みパラメータにはこういった権利は発生しません。

image.png

この記事では、特に重要な、著作権が生じる学習用プログラム・推論プログラムについて解説をしたいと思います。

納品予定の学習用プログラム・推論プログラムの著作権について契約で定める場合、①著作権をどちらに帰属させるか(権利帰属)、②双方がどういった条件で利用できるようにするか(利用条件)の二段階の検討が必要です。

(2) 著作権の権利帰属

まず、著作権がユーザ・ベンダのどちらに帰属するかを決める必要があります。

ユーザ帰属、ベンダ帰属、ユーザとベンダの共有、といった選択肢があります。
後述の利用条件のところでも述べるとおり、あまり「どちらに帰属させるべき」という定式はありませんが、

  • ベンダがライセンスフィーで利益を得るようなビジネスモデルの場合には著作権の帰属にこだわることが多い
  • ユーザが開発手法等についても主導権をもって開発を進め、ベンダの裁量が狭いような場合にはユーザに著作権を帰属させることも多い
  • ユーザ・ベンダが協業して完成したモデルを売っていくような場合には共有にすることも多い

といった大まかな傾向はあるかもしれません。

ちなみに、権利帰属について契約で何も決めない場合は、基本的には、プログラムを生成したベンダに著作権が帰属します。

(3) 利用条件

所有権の対象(有体物)は、基本的に「一人しか対象物を利用できない」ことになりますが、著作権の対象(無体物)は多数の人が同時に利用可能です。

例えば、家は有体物で所有権の対象とりますが、

  • 所有者が住む
  • 所有者は使わずに賃借人に住まわせる

といった程度の選択肢しかありません。

他方、著作権の対象となるもの(たとえばプログラム)であれば、

  • 著作権者だけが使う
  • 著作権者も使うが、公開して世界中の人にも使わせてあげる。
  • 著作権者は使わず、もっぱら知り合いに使わせてあげる。使うだけでなく、改良することも許可する。知り合いが改良版を公開したいといったので許可する。

といった、様々な使い方があります。

先ほど、著作権は「権利の束」だと説明しました。

image.png

この束のなかで、どの部分を許可するかを自由に決めることができるのです(a)。
例えば、
・複製は自由に許す
・貸与することも許すが、お金の支払いなどの条件を付ける
・翻案(編集)も許すが、編集できる範囲を制限する
・複製・貸与・翻案以外は禁止
といった具合です。

image.png

また、これも重要な視点ですが、著作権者の権利を契約によって制限することもできます(b)。
image.png

著作権者の権利を全部制限し、他方で第三者に広い範囲でライセンスを付与するような場合には、もはやライセンスを付与される第三者が著作権を持っているのとあまり変わらない状態にもなり得ます。

image.png

「権利をどちらに帰属するか」も重要ですが、「利用条件をどのように設定するか」が重要なことがお分かりいただけたでしょうか。

では、利用条件はどういった要素に着目して定めればよいでしょうか。
この点については、経産省のAI契約ガイドラインの次の表が参考になります(AI契約ガイドライン31頁)

image.png

image.png

例えば、
・著作権はベンダ帰属
・ユーザは自己の業務に必要な範囲で、無償・無期限の非独占的な利用の許諾を受ける
・ベンダは基本的に自由に使えるが、ユーザの競合企業である●●社には横展開してはならない
といった条件を定めることが考えられます。

このような条件を決めたら、上記の表を「別紙」として契約書につけて、「別紙の条件で利用します」という趣旨の条項に落とし込めばOKです!(この表をいい感じに完成させて法務にもっていけばいい感じに契約に落とし込んでくれるはずです!)
以下はAI契約ガイドラインを参考にした規定例です(AI契約ガイドライン114~118頁参照)。

第●●条(本件成果物の著作権)
1.本件成果物および本開発遂行に伴い生じた知的財産(以下「本件成果物等」という。)に関する著作権(著作権法第 27 条および第 28 条の権利を含む。)は、ユーザまたは第三者が従前から保有していた著作物の著作権を除き、ベンダに帰属する。
2.ユーザおよびベンダは、本契約に従った本件成果物等の利用について、他の当事者および正当に権利を取得または承継した第三者に対して、著作者人格権を行使しないものとする。

第●●条(本件成果物の利用条件)
ユーザおよびベンダは、本件成果物等について、別紙「利用条件一覧表」記載のとおりの条件で利用できるものとする。同別紙の内容と本契約の内容との間に矛盾がある場合には同別紙の内容が優先するものとする。

「著作者人格権」などの難しい表現は一旦さておき、何となくイメージを持っていただけたのではないでしょうか。

(4) 契約の落とし穴

最後に、契約締結の際に陥りがちな落とし穴を二つほど紹介します。

落とし穴①:「著作権共有」

著作権の帰属について、ユーザもベンダも譲らず決着がつかない…といった場合には、折衷案として著作権をユーザ・ベンダの一方に帰属させず、「共有」とすることもあるかと思います。また、共同開発のような場合にも共有とすることもあるでしょう。

この場合、「共有にしたからお互い自由に使える」と思っていませんか?

著作権法65条1項・2項を見てみましょう。

(共有著作権の行使)
第六十五条 共同著作物の著作権その他共有に係る著作権(以下この条において「共有著作権」という。)については、各共有者は、他の共有者の同意を得なければ、その持分を譲渡し、又は質権の目的とすることができない。
2 共有著作権は、その共有者全員の合意によらなければ、行使することができない。

条文をみると、著作権の行使は「他の共有者の合意」が必要とされています。
つまり、「●●に使っていいよ」という合意を逐一得ないとなにもできない、ということですね。
ですので、共有にする場合には、同じ契約で「●●に使っていいよ」という合意を得ることが重要です。

AI契約ガイドラインの契約書案でも、共有にする場合にはちゃんとこの「合意」についての規定があります(114頁)。
ちょっと長いですが、条文を貼っておきます。

第●●条(本件成果物等の著作権)
1.本件成果物および本開発遂行に伴い生じた知的財産(以下「本件成果物等」という。)に関する著作権(著作権法第 27 条および第 28 条の権利を含む。)は、ユーザのベンダに対する委託料の支払いが完了した時点で、ユーザ、ベンダまたは第三者が従前から保有していた著作物の著作権を除き、ベンダおよびユーザの共有(持分均等)とする。なお、ベンダからユーザへの著作権移転の対価は、委託料に含まれるものとする。
2.前項の場合、ユーザおよびベンダは、共有にかかる著作権につき、本契約に別に定めるところに従い、前項の共有にかかる著作権の行使についての法律上必要とされる共有者の合意を、あらかじめこの契約により与えられるものとし、相手方の同意なしに、かつ、相手方に対する対価の支払いの義務を負うことなく、自ら利用することができるものとする。
3.ユーザ及びベンダは、相手方の同意を得なければ、第 1 項所定の著作権の共有持分を処分することはできないものとする。
4.ユーザおよびベンダは、本契約に従った本件成果物等の利用について、他の当事者および正当に権利を取得または承継した第三者に対して、著作者人格権を行使しないものとする。

落とし穴②:「協議によって定める」

著作権の帰属について、ユーザもベンダも譲らず決着がつかない…という場合には、帰属について定めることをあきらめて「協議によって定める」といった規定がされることもあります。
しかし、これは当事者にとってリスクが残る契約といえます。

著作権の帰属について何も定めなかった場合には、著作権は開発を行った側、すなわちベンダに帰属します。

そうすると、ユーザとしては、「協議によって定める」場合に協議がまとまらないと、権利を得られないことになります。また、利用許諾も得ていないと、せっかくできたモデルを何も使えない、ということになってしまいます。

ベンダとしても、例えばユーザに有償でライセンスして利益を得ようと思っていた場合、「協議」がまとまらないと結局このようなも目論見が外れてしまい、せっかく作ったのにお金にならない、、となりかねません。

したがって、権利帰属をペンディングにして「協議によって定める」ということは避けるべきです。

同様のことは、開発PJ開始時に権利帰属について定めていない場合にも妥当することがあります。
すなわち、アセスメント・PoCと個別に契約を締結して進めてきたものの、開発フェーズに入って初めてモデルの権利帰属を議論したところまとまらない、といった事態が生じうることがあります。
これはどうすれば防げたかというと、アセスメント開始時に、開発PJ全体に適用される「基本契約書」と締結し、そこでモデルの権利帰属について定めておけばよかった、ということになります。
ただ、開発PJ初期で開発対象が明確に決まっていないことも多く、そういった場合に無理して権利帰属の議論をすることは避けた方がよい場合もあり、どういった形の契約を締結するかは案件ごとに判断した方がよいです。

(5) まとめ

  • 学習用プログラム・推論プログラムには著作権が発生する。
  • 権利帰属と利用条件の二つを決める必要がある。
  • 利用条件を考える際には、AI契約ガイドライン31頁の表が参考になる。
  • 著作権を共有にする場合には、行使についての合意をしておく。
  • 「協議によって定める」といった条項は避けた方がよい場合が多い。

最後に

いかがでしたでしょうか?

今回、初めてQiitaに記事を投稿させていただいたのですが、これまでAIの開発契約を数多く見てきた中で、「技術者・ビジネスサイドと法務がもっと理解しあっていればもっとスムーズに契約交渉が進むのに…」と思ったことが数多くありました。また、技術者が法的なところで疑問を持っているが適切な相談相手がまわりにいない、という場面を見かけることもありました。

この記事が、少しでもそのような問題の解決の一助になれば、と思っております!

かなり長文になってしまったので、最後まで読んでくださった方はいるのだろうか。。。という気もしますが、
今後も記事を更新していくかもしれませんので、疑問点などあればお気軽にコメントなどしていただけますと幸いです!

最後までお読みいただき、ありがとうございました。

※※※※※※※
・この記事のうち意見にわたる部分は私の個人的な見解でであり、私が所属する組織の見解ではないことにご留意ください。
・この記事は一般論を述べたものであり、具体的ケースのご判断の際には専門家にご相談ください。
・この記事はわかりやすさに重点を置いたため、厳密には不正確な表現となっている部分もあることにもご留意ください。
※※※※※※※

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

rpycモジュールを使ってMayaを外部のPythonインタプリタから操作する

この記事はTakumi Akashiro ひとり Advent Calendar 2020の23日目の記事です。

始めに

以前Houdiniを触った際、外部のPythonからHoudiniのhouオブジェクトを操作するhrpycモジュールが凄いなあ……1とおもったので、
hrpycを参考にして、Mayaで同じようなものが書けないかなあと思ったので作ってみました。

使い方

  1. pip install rpyc -t <mayaのScriptフォルダ> でrpycをインストールする。
    バイナリは無いのでpython2系なら何でも大丈夫……なはず。
  2. 以下のファイルをMayaのScriptフォルダを保存。
mrpyc.py
#! python2
# encoding: utf-8

import threading

from rpyc.utils import classic
from rpyc.core import SlaveService
from rpyc.utils.server import ThreadedServer

PORT = 18812

def _start_server(port=PORT):
    t = ThreadedServer(
        SlaveService,
        hostname = '0.0.0.0', port = int(port),
        reuse_addr = True, authenticator = None,
        registrar = None, auto_register = False
        )
    t.start()


def start_server(port=PORT):
    thread = threading.Thread(target=lambda: _start_server(port))
    thread.start()
    return thread


def import_remote_maya_module(server="127.0.0.1", port=PORT):
    return classic.connect(server, port).modules['maya']

3.MayaのScriptEditorなどで以下を実行してMayaのサーバーを立てる。

import mrpyc
mrpyc.start_server()

4.外部のインタプリタで以下を実行して、mayaモジュールを取得して、polyCubeを作ってみます。

import mrpyc
maya = mrpyc.import_remote_maya_module()

maya.cmds.polyCube()

image.png

23_01.gif

締め

以外と簡単2にできましたね!

ただ実験した感じ、数分経つと接続が死んだり、Mayaがクラッシュしたりするので、
もうちょっとhrpyc_RemoteHouAttrWrapperを参考にして実装すると安全にできるのかもしれないです。


  1. 凄いなあって思うだけで実際に使ったことは無いですが。 

  2. 自分が触った感じで3時間程度 

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

rpycモジュールを使って外部のPythonインタプリタからMayaを操作する

この記事はTakumi Akashiro ひとり Advent Calendar 2020の23日目の記事です。

始めに

以前Houdiniを触った際、外部のPythonからHoudiniのhouオブジェクトを操作するhrpycモジュールが凄いなあ……1とおもったので、
hrpycを参考にして、Mayaで同じようなものが書けないかなあと思ったので作ってみました。

rpycモジュールとは

Remote Python Call、すなわち別のプロセスのPythonのオブジェクトを使うためのモジュールです。2

呼び出し元はサーバーを立て、呼び出し先にネットワーク経由でオブジェクトのプロキシを提供することで別セッションのPythonのオブジェクトを使えます。
詳細はrpyc · PyPIに任せますが、Houdiniで使うhrpycはrpycを非常に簡素にしたラッパーです。

使い方

  1. pip install rpyc -t <mayaのScriptフォルダ> でrpycをインストールする。
    バイナリは無いのでpython2系なら何でも大丈夫……なはず。
  2. 以下のファイルをMayaのScriptフォルダを保存。
mrpyc.py
#! python2
# encoding: utf-8

import threading

from rpyc.utils import classic
from rpyc.core import SlaveService
from rpyc.utils.server import ThreadedServer

PORT = 18812

def _start_server(port=PORT):
    t = ThreadedServer(
        SlaveService,
        hostname = '0.0.0.0', port = int(port),
        reuse_addr = True, authenticator = None,
        registrar = None, auto_register = False
        )
    t.start()


def start_server(port=PORT):
    thread = threading.Thread(target=lambda: _start_server(port))
    thread.start()
    return thread


def import_remote_maya_module(server="127.0.0.1", port=PORT):
    return classic.connect(server, port).modules['maya']

3.MayaのScriptEditorなどで以下を実行してMayaのサーバーを立てる。

import mrpyc
mrpyc.start_server()

4.外部のインタプリタで以下を実行して、mayaモジュールを取得して、polyCubeを作ってみます。

import mrpyc
maya = mrpyc.import_remote_maya_module()

maya.cmds.polyCube()

image.png

23_01.gif

締め

以外と簡単3にできましたね!

ただ実験した感じ、数分経つと接続が死んだり、Mayaがクラッシュしたりするので、
もうちょっとhrpyc_RemoteHouAttrWrapperを参考にして実装すると安全にできるのかもしれないです。


  1. 凄いなあって思うだけで実際に使ったことは無いですが。 

  2. 諸説ある 

  3. 自分が触った感じだとhrpyc.pyから余分な部分を削ってつくることで、3時間程度で出来た。 

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

性のクリスマスイブに彼女が居ないひとへ ◯ぎ声キーボード

みなさんこんにちは限界開発鯖のやばい人やFascodeNetworkの広報やブログでデブ活などなどをやっているくもことなおこです。
Qiitaで投稿したのが初めてなので若干至らない点があるかもしれませんがご了承ください

作った経緯

早速自分が使いたいがために作った◯ぎ声キーボード、キーを押すたびに◯ぎ声がなるという恐ろしく世紀末的なPythonとVLCを使ったツール?です。

まずは自分が多少わかる言語でバックグラウンド上で動作するキー検知いわゆるキーロガーですね、、、を作らなくちゃいけません

その条件で色々探したところ
Pythonのpyxhookライブラリを使うのが一番楽そう
使い方に関してはここがわかりやすい
https://qiita.com/kaitaku/items/5c327ddd779260c63036

コード

と言っても上の方のソースコードコピーしてちょいといじった感じですね。
キー入力検知したらサウンド鳴らすだけのコードです。

GitHubのコード
https://gist.github.com/naoko1010hh/77d15c8658a4fadfc7e945da80446e22
Screenshot_20201223-210734.png

実際に動かしてみた動画をTwitterに上げているのでもしよければ見てね。ボリューム小さくしてから見たほうがいいかも
https://twitter.com/naoko1010hh/status/1322766477649453057?s=19

動画内で使用した音声はみじんこさんの音声有料素材を使用させていただきました。https://www.dlsite.com/maniax/work/=/product_id/RJ287819.html

最後に

後つけ感はありますが今後の目標はキーごとに音を割り振って支援が必要な方がキー配置を覚えられるようにしたいですね

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

SWIGでC/C++をラップしPythonの処理を高速化する。【概要編】

SWIGとは?

SWIGは、C/C++で書かれたプログラムをラップし、多言語で使えるようにするためのツールです。
サポートされている言語には、Javascript、Perl、PHP、Python、Tcl、Rubyなどのスクリプト言語や、C#、D、Go、Java、Lua、OCaml、Octave、Scilab、Rなどの非スクリプト言語があります。

速度比較

C/C++で書かれたコードが早いのはよく知られていますが、実際にどれくらいの速度差があるのか、いくつかの条件で比較します。

<実行環境>
OS: Ubuntu18.04
CPU: Intel Corei7-7700k
メモリ: 24GB

<コンパイル環境>
-O3オプションあり
並列化なし

Hello World

"hello world!"と1000回表示する関数の実行速度

python_time:2.356291e-03[sec]
swig_time__:1.398325e-03[sec]

2倍程度の差がありますが、一般的に言われているほどの差があるようには見えません。
厳密にはわかりませんが、文字列のコンソール表示速度のオーバーヘッドがあるからだと思われます。


コード
python
def hello_world():
    for i in range(1000):
        print("hello world!")

def test1():
    # python
    start = time.time()
    hello_world()
    python_time = time.time() - start
    # swig
    start = time.time()
    helloWorld()
    swig_time = time.time() - start
    print("python_time:{:e}".format(python_time) + "[sec]")
    print("swig_time__:{:e}".format(swig_time) + "[sec]")
c
void helloWorld()
{
    for (int i = 0; i < 1000; i++)
    {
        printf("hello world!\n");
    }
}


数字のカウントアップ

1000までをカウントアップして返す関数の実行速度

python_time:6.842613e-05[sec]
swig_time__:5.483627e-06[sec]

今度は10倍以上の差が出ました。数字をカウントアップするだけでここまで差が出ると、pythonは遅いといわれる理由が垣間見えてきます。


コード
python
def count_up():
    res = 0
    for i in range(1000):
        res += 1
    return res

def test2():
    # python
    start = time.time()
    res = count_up()
    print(res)
    python_time = time.time() - start
    # swig
    start = time.time()
    res = countUp()
    print(res)
    swig_time = time.time() - start
    print("python_time:{:e}".format(python_time) + "[sec]")
    print("swig_time__:{:e}".format(swig_time) + "[sec]")
c
int countUp()
{
    int res = 0;
    for (int i = 0; i < 1000; i++)
    {
        res += 1;
    }
    return res;
}


画像の変換

グレースケール画像からRGB画像へ変換する関数の実行速度
※Python側ではOpenCVモジュールを使用

python_time:1.032352e-04[sec]
swig_time__:1.156330e-04[sec]

こちらはほとんど変わらない結果になりました。OpenCVはそもそもC/C++で書かれているので、当然といえば当然の結果ですね。
しかし、このSWIGの実行速度には、np.zeros()で出力先のメモリを確保しておく処理が含まれています。OpenCVのpythonパッケージを利用している場合には不可能ですが、SWIGで書いている場合には出力先メモリを再利用することが可能です。
メモリ空間の確保を外だしした場合の速度は以下のようになります。

python_time:1.101494e-04[sec]
swig_time__:7.319450e-05[sec]

この条件だとSWIGが30%以上高速化しています。複数回実行する場合はSWIGでのベタ書きにも優位性がありそうです。


コード
python
def test3():
    img_size = (256, 256)
    org_img = np.random.randint(0, 256, (img_size), dtype=np.uint8)
    # python
    start = time.time()
    res_py = cv2.cvtColor(org_img, cv2.COLOR_GRAY2RGB)
    python_time = time.time() - start
    # swig
    # res_swig = np.zeros((*img_size, 3), dtype=np.uint8)
    start = time.time()
    res_swig = np.zeros((*img_size, 3), dtype=np.uint8)
    imgGray2RGB(org_img, res_swig)
    swig_time = time.time() - start
    print("array_equal: {}".format(np.array_equal(res_py, res_swig)))
    print("python_time:{:e}".format(python_time) + "[sec]")
    print("swig_time__:{:e}".format(swig_time) + "[sec]")
c
void imgGray2RGB(unsigned char *inArr, int inDim1, int inDim2,
                 unsigned char *inplaceArr, int inplaceDim1, int inplaceDim2, int inplaceDim3)
{
    int height = inplaceDim1;
    int width = inplaceDim2;
    int channel = inplaceDim3;
    int h, w;
    int in_point, out_point;

    for (h = 0; h < height; h++)
    {
        for (w = 0; w < width; w++)
        {
            in_point = h * width + w;
            out_point = channel * (h * width + w);
            inplaceArr[out_point] = inArr[in_point];
            inplaceArr[out_point + 1] = inArr[in_point];
            inplaceArr[out_point + 2] = inArr[in_point];
        }
    }
}


(オプション) 画像の正規化

グレースケールからRGBへの変換に加え、画像の正規化を行った実行速度
※Python側ではOpenCVモジュールを使用

python_time:1.460791e-03[sec]
swig_time__:3.521442e-04[sec]

OpenCVもC/C++で実装しているはずですが、SWIGがOpenCVより4倍の速度が出ています。
これは、一度のラスタ走査ですべての処理を終えており、処理の量が大きく削減されているためです。OpenCVのPythonパッケージは当然ながら関数単位でわかれているため、こういった処理の削減で高速化を狙う場合は、C/C++での処理が必須になります。


コード
python
def test4():
    img_size = (256, 256)
    mean = [0.485, 0.456, 0.406]
    std = [0.229, 0.224, 0.225]
    mean_np = np.array(mean, dtype=np.float32)
    std_np = np.array(std, dtype=np.float32)
    org_img = np.random.randint(0, 256, (img_size), dtype=np.uint8)
    # python
    start = time.time()
    res_py = cv2.cvtColor(org_img, cv2.COLOR_GRAY2RGB).astype(np.float32)
    res_py = ((res_py / 255) - mean_np) / std_np
    python_time = time.time() - start
    # swig
    start = time.time()
    res_swig = np.zeros((*img_size, 3), dtype=np.float32)
    imgNormalize(org_img, res_swig, *mean, *std)
    swig_time = time.time() - start
    print("array_equal: {}".format(np.array_equal(res_py, res_swig)))
    print("python_time:{:e}".format(python_time) + "[sec]")
    print("swig_time__:{:e}".format(swig_time) + "[sec]")
c
void imgNormalize(unsigned char *inArr, int inDim1, int inDim2,
                  float *inplaceArr, int inplaceDim1, int inplaceDim2, int inplaceDim3,
                  float meanR, float meanG, float meanB,
                  float stdR, float stdG, float stdB)
{
    int height = inplaceDim1;
    int width = inplaceDim2;
    int channel = inplaceDim3;
    int h, w;
    int val;
    int inPoint, outPoint;

    for (h = 0; h < height; h++)
    {
        for (w = 0; w < width; w++)
        {
            inPoint = h * width + w;
            outPoint = channel * (w + width * h);
            val = inArr[inPoint];
            inplaceArr[outPoint] = ((float)val / 255 - meanR) / stdR;
            inplaceArr[outPoint + 1] = ((float)val / 255 - meanG) / stdG;
            inplaceArr[outPoint + 2] = ((float)val / 255 - meanB) / stdB;
        }
    }
}


次回

次は実装編になります。実装編では基本的なSWIGの使い方から、Numpyを直接引数へ渡してC/C++側からポインタ参照する方法など、ちょっとした応用を紹介予定です。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

TKinterを利用してパソコン設定変更アプリを作ってみるぞ?

pcname.PNG

pythonの勉強として簡単なソフトを作ってみよう!

なんて意気込みでtkinterで1つのソフトを作ってみます。
とりあえず、Windows上で設定変更をかけるソフト。

名前は「お家に帰ろう」

「設定変更が便利になるといいな」的なコンセプトで作り始めました。

Windowsの設定変更ソフトを作りたい方に、少し参考になる程度の情報が提供できたらいいな!

tkinterとは?

python標準のGUIライブラリで、古い見た目のUIを実装できます。

調べていくと、どうやらtkinterとtkinter.ttkでUIデザインが異なる模様。
ttkの方がモダンなUIになります。

さらに2018年からPySimpleGUIの開発が始まっており、コードも短く、よりモダンなデザインでGUIを組める模様。

今回はNETに情報が多いからtkinterでチャレンジしてみよう!ってわけで作成を開始しました。

作ったもの

ouchi.PNG

こんな感じです。
うん。素人の作品っぽくていいですね 笑

とりあえず下記を実装してみました。
・PC名変更
・IPv4変更
・自動ログオン

作って気づいたこと

ごく当たり前のことで、薄々わかってたんですが、せめて設計書的なものは作りましょう 笑

はじめは「PC名変更」くらいしか考えてなかったのですが、後から実装したい機能がどんどん増えてきてしまいました。。

その都度UIに変更を加えていったので、ここがモーレツに面倒でした。
いや、ほんとアホですよね。うん。
思い付きはダメデス

こんなノリで作り始めましたが、上の画像の機能自体はちゃんと動くようになりました。

Windowsは権限、UACの問題があるので、そのへんの動きをおさえて変更が完了するように作る必要があります。

そのほか、process.runやレジストリ書き込み、入力文字のチェックやログ出力の実装、pyarmor、exe化、モジュール分割などなど入れて、再起動後は自動的にstep2としてプログラムを起動するところまで作ってます。

デバッグやブラッシュアップができればフリーソフト公開なんてできればいいのですが、まだ先かなぁと思ってます。

でも、やっぱり1つのモノを作ることで1連の流れが見えてきますね!
勉強になりました。

tkinterの実装

こんな感じのコードでGUIの実装を始めます。

class Application(tk.Frame):
    def __init__(self, master):
        super().__init__(master)
        self.pack()
        self.master.geometry("620x630")
        self.master.title("お家に帰ろう")
        nb = ttk.Notebook(width=200, height=200)

メニューバーはこんな感じ。

        # メニューバーの作成
        self.menubar = tk.Menu(master)
        master.configure(menu = self.menubar)
        # ファイルメニュー
        filemenu = tk.Menu(self.menubar, tearoff = 0)
        self.menubar.add_cascade(label = "ファイル", menu = filemenu)
        # ~内容
        filemenu.add_command(label = "ファイルから読み込む(開発中)")
        # ~内容
        filemenu.add_command(label = "ファイルに保存する(開発中)")
        # セパレーター
        filemenu.add_separator()
        filemenu.add_command(label = "終了", command = lambda: master.destroy())

タブの実装

        #タブ
        tab1 = tk.Frame(nb)
        tab2 = tk.Frame(nb)
        tab3 = tk.Frame(nb)
        tab4 = tk.Frame(nb)
        self.textbox1 = tk.Text(tab4)
        self.textbox1.configure(font=("", 14, ""))
        self.textbox1.pack()
        self.textbox1.place(x=150, y=0,width = 460, height = 570)
        tab5 = tk.Frame(nb)

ちなみにtab4はtextboxを実装します。

例えばエクセルからパラメータを張り付けて設定変更できると便利だよね?なんて発想。
実装自体は難しくないレベル。

CSVファイルから読み込んでもいいですし、このあたりはどうやってデータを入れるか、運用に合わせて決めるといいですね!

設定変更コマンド

単純にコマンドプロンプトのコマンドをprocess.runで実装できます。ただ、errorlevelを受け取る方法がわかりませんでした。

なので判定を入れ込むならpowershellの方がいいかもしれませんね。

今回はpowershellでコンピュータ名を変更し、戻り値を判定して正常、異常を判定しています。

IP変更

今回はコマンドプロンプトで実装しました。

判定はレジストリからパラメータを取得し、変更後の値と比較すればOK。
単純にipconfigなんて打ってもオフライン時は出力しないので注意が必要。

なお、レジストリを参照すれば「イーサネット」などのインタフェースもリストで出力できますヨ

チェック機能

値のチェックは入力なしや文字数などで判定させています。

ipaddressは以下のようなチェック方法があります。

import ipaddress as ipadd
def chk_ipv4(self,ipaddress):
    try:
        ipadd.ip_address(ipaddress)
        return "ok"
    except ValueError:
        return "ng"

難易度

pythonの初学者にとってちょうどいいくらいのレベルなのかな?と思います。

私自身、python勉強して2ヶ月くらいでつくりました。

ただしWindowsの動きがわかってないと作るのが難しいとは思います。

同じようなソフトを作る方へ

pyinstallerでexe化ができますが、process.runを利用する場合は下記の記事を読んだ方がいいです。とても参照になりました。(ありがとうございます。)
https://sapporo-president.com/archives/15581

普通にexeにすると動作しませんでした。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

TKinterを利用してパソコン設定変更ソフトを作ってみる(初心者)

pcname.PNG

pythonの勉強として簡単なソフトを作ってみよう!

なんて意気込みでtkinterで1つのソフトを作ってみます。
とりあえず、Windows上で設定変更をかけるソフト。

名前は「お家に帰ろう」

「設定変更が便利になるといいな」的なコンセプトで作り始めました。

Windowsの設定変更ソフトを作りたい方に、少し参考になる程度の情報が提供できたらいいな!

tkinterとは?

python標準のGUIライブラリで、古い見た目のUIを実装できます。

調べていくと、どうやらtkinterとtkinter.ttkでUIデザインが異なる模様。
ttkの方がモダンなUIになります。

さらに2018年からPySimpleGUIの開発が始まっており、コードも短く、よりモダンなデザインでGUIを組める模様。

今回はNETに情報が多いからtkinterでチャレンジしてみよう!ってわけで作成を開始しました。

作ったもの

ouchi.PNG

こんな感じです。
うん。素人の作品っぽくていいですね 笑

とりあえず下記を実装してみました。
・PC名変更
・IPv4変更
・自動ログオン

作って気づいたこと

ごく当たり前のことで、薄々わかってたんですが、せめて設計書的なものは作りましょう 笑

はじめは「PC名変更」くらいしか考えてなかったのですが、後から実装したい機能がどんどん増えてきてしまいました。。

その都度UIに変更を加えていったので、ここがモーレツに面倒でした。
いや、ほんとアホですよね。うん。
思い付きはダメデス

こんなノリで作り始めましたが、上の画像の機能自体はちゃんと動くようになりました。

Windowsは権限、UACの問題があるので、そのへんの動きをおさえて変更が完了するように作る必要があります。

そのほか、process.runやレジストリ書き込み、入力文字のチェックやログ出力の実装、pyarmor、exe化、モジュール分割などなど入れて、再起動後は自動的にstep2としてプログラムを起動するところまで作ってます。

デバッグやブラッシュアップができればフリーソフト公開なんてできればいいのですが、まだ先かなぁと思ってます。

でも、やっぱり1つのモノを作ることで1連の流れが見えてきますね!
勉強になりました。

tkinterの実装

こんな感じのコードでGUIの実装を始めます。

class Application(tk.Frame):
    def __init__(self, master):
        super().__init__(master)
        self.pack()
        self.master.geometry("620x630")
        self.master.title("お家に帰ろう")
        nb = ttk.Notebook(width=200, height=200)

メニューバーはこんな感じ。

        # メニューバーの作成
        self.menubar = tk.Menu(master)
        master.configure(menu = self.menubar)
        # ファイルメニュー
        filemenu = tk.Menu(self.menubar, tearoff = 0)
        self.menubar.add_cascade(label = "ファイル", menu = filemenu)
        # ~内容
        filemenu.add_command(label = "ファイルから読み込む(開発中)")
        # ~内容
        filemenu.add_command(label = "ファイルに保存する(開発中)")
        # セパレーター
        filemenu.add_separator()
        filemenu.add_command(label = "終了", command = lambda: master.destroy())

タブの実装

        #タブ
        tab1 = tk.Frame(nb)
        tab2 = tk.Frame(nb)
        tab3 = tk.Frame(nb)
        tab4 = tk.Frame(nb)
        self.textbox1 = tk.Text(tab4)
        self.textbox1.configure(font=("", 14, ""))
        self.textbox1.pack()
        self.textbox1.place(x=150, y=0,width = 460, height = 570)
        tab5 = tk.Frame(nb)

ちなみにtab4はtextboxを実装します。

例えばエクセルからパラメータを張り付けて設定変更できると便利だよね?なんて発想。
実装自体は難しくないレベル。

CSVファイルから読み込んでもいいですし、このあたりはどうやってデータを入れるか、運用に合わせて決めるといいですね!

設定変更コマンド

単純にコマンドプロンプトのコマンドをprocess.runで実装できます。ただ、errorlevelを受け取る方法がわかりませんでした。

なので判定を入れ込むならpowershellの方がいいかもしれませんね。

今回はpowershellでコンピュータ名を変更し、戻り値を判定して正常、異常を判定しています。

IP変更

今回はコマンドプロンプトで実装しました。

判定はレジストリからパラメータを取得し、変更後の値と比較すればOK。
単純にipconfigなんて打ってもオフライン時は出力しないので注意が必要。

なお、レジストリを参照すれば「イーサネット」などのインタフェースもリストで出力できますヨ

チェック機能

値のチェックは入力なしや文字数などで判定させていますが、IPアドレス形式は下記で簡単にチェックできます。

import ipaddress as ipadd
def chk_ipv4(self,ipaddress):
    try:
        ipadd.ip_address(ipaddress)
        return "ok"
    except ValueError:
        return "ng"

難易度

pythonの初学者にとってちょうどいいくらいのレベルなのかな?と思います。

私自身、python勉強して2ヶ月くらいでつくりました。

ただしWindowsの動きがわかってないと作るのが難しいとは思います。

同じようなソフトを作る方へ

pyinstallerでexe化ができますが、process.runを利用する場合は下記の記事を読んだ方がいいです。とても参照になりました。(ありがとうございます。)
https://sapporo-president.com/archives/15581

普通にexeにすると動作しませんでした。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【2021年度版】Pythonインストール Windows10(64bit)編

はじめに

Pythonをインストールする機会が多いのでマニュアル化しました。
本記事はWindows10(64bit)向けです。

Python3.9 インストール

Python公式のダウンロードページ(https://www.python.org/downloads) にアクセスして「Download Python」をクリックします。
1.png

ダウンロードしたexeファイルをクリックします。

ダイアログ画面が起動しましたら「Add Python 3.9 to Path」にチェックを入れて「Install now」をクリックしてインストールを開始します。※チェックは入れることで、環境変数の設定が不要となるため楽です。

2.png

「Setup was successful」と表示されればインストール成功のため「Close」をクリックして閉じます。
3.png

Pythonコマンドのチェック

正常にインストールされたことをチェックするため、コマンドプロンプトを起動して以下コマンドを入力して実行します。

python -V

今回私がダウンロードしたバージョンはPython3.9.1になるため「Python 3.9.1」と表示されれば成功です。
4.png

最後に

特に難しい手順はないですね!読んでいただきありがとうございます。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

OpenCV で動画を加工する

OpenCVで簡単に動画に任意の処理ができます。

1、ビデオを読み込んで書き込む

import cv2
cap = cv2.VideoCapture('./video.mp4') #読み込む動画のパス
fps = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc('m','p','4', 'v') #mp4フォーマット
video = cv2.VideoWriter('./edited_video.mp4', fourcc, fps, (1920,1080)) #書き込み先のパス、フォーマット、fps、サイズ

avg = None

while True:
    # 1フレームずつ取得する。
    ret, frame = cap.read()
    if not ret:
        break

    #? 任意の処理をここに書く ?

    video.write(frame)

    key = cv2.waitKey(30)
    if key == 27:
        break

cap.release()
video.release()

2、処理を書く

例えば、動画をスケッチ風にするには、
1、の任意の処理のところに以下を入れます。

    #? 任意の処理をここに書く ?
    # 白黒画像に
    grayImage = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # 白黒反転
    grayImageInv = 255 - grayImage

    # ぼかしをかける
    grayImageInv = cv2.GaussianBlur(grayImageInv, (21, 21), 0)

    #blend using color dodge
    output = cv2.divide(grayImage, 255-grayImageInv, scale=256.0)
    output = cv2.cvtColor(output, cv2.COLOR_GRAY2BGR)
    video.write(output)

Dec-23-2020 18-59-56.gif

Dec-23-2020 19-01-12.gif

?


フリーランスエンジニアです。
お仕事のご相談こちらまで
rockyshikoku@gmail.com

Core MLを使ったアプリを作っています。
機械学習関連の情報を発信しています。

Twitter
Medium

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

磁気流体の数値計算で遊ぶ

磁気流体とは

プラズマや液体金属のような電気伝導性の高い流体を扱う流体力学を磁気流体力学 (Magnetohydrodynamics) といいます。英語から略してよくMHDと呼ばれます。
この磁気流体の方程式系は保存形で書くと以下のようになります。

\frac{\partial\mathbf{U}}{\partial t} + \frac{\partial\mathbf{F}}{\partial x} + \frac{\partial\mathbf{G}}{\partial y} + \frac{\partial\mathbf{H}}{\partial z}= \mathbf{0}
\mathbf{U} = \begin{bmatrix}
\rho \\
\rho u \\
\rho v \\
\rho w \\
B_x \\
B_y \\
B_z \\
e
\end{bmatrix},\ 
\mathbf{F} = \begin{bmatrix}
\rho u \\
\rho u^2+p_T-B_x^2 \\
\rho uv-B_xB_y \\
\rho uw-B_xB_z \\
0 \\
B_yu-B_xv \\
B_zu-B_xw \\
\left(e+p_T\right)u-B_x\left(\mathbf{v}\cdot\mathbf{B}\right)
\end{bmatrix},\ 
\mathbf{G} = \begin{bmatrix}
\rho v \\
\rho vu-B_yB_x \\
\rho v^2+p_T-B_y^2 \\
\rho vw-B_yB_z \\
B_xv-B_yu \\
0 \\
B_zv-B_yw \\
\left(e+p_T\right)v-B_y\left(\mathbf{v}\cdot\mathbf{B}\right)
\end{bmatrix},\ 
\mathbf{H} = \begin{bmatrix}
\rho w \\
\rho wu-B_zB_x \\
\rho wv-B_zB_y \\
\rho w^2+p_T-B_z^2 \\
B_xw-B_zu \\
B_yw-B_zv \\
0 \\
\left(e+p_T\right)w-B_z\left(\mathbf{v}\cdot\mathbf{B}\right)
\end{bmatrix}
p = \left(\gamma-1\right)\left(e-\frac{1}{2}\rho|\mathbf{v}|^2-\frac{1}{2}|\mathbf{B}|^2\right),\ p_T = p+\frac{1}{2}|\mathbf{B}|^2

ここで $\rho, p$ は密度・圧力で、$u, v, w$ はそれぞれ $x, y, z$ 方向の速度、$B_x, B_y, B_z$ はそれぞれ $x, y, z$ 方向の磁場、さらに $e$ はエネルギー密度を表しています。$\gamma$ は比熱比です。またここでは $\mu_0, 4\pi$ 等の磁場に関する係数が消える単位系を採用しています。
通常の流体と比べてみると、新たに磁場が変数として加わっていることがわかりますね。

さて、この磁気流体の有名な問題にOrszag-Tang渦問題 (Orszag and Tang, 1979) というものがあります (Google画像検索) 。渦が衝撃波を形成しながら複雑に絡み合った模様が形成され、しかも数値的に発生する磁場発散を適切に除去できないと計算が破綻しやすいことが知られており、磁気流体計算の標準的なテスト問題としてよく用いられています。
具体的な設定は以下の通りです。

\rho = \gamma^2,\ p = \gamma
u = -\sin y,\ v = \sin x,\ w = 0
B_x = -\sin y,\ B_y = \sin 2x,\ B_z = 0
\gamma = 5/3,\ 0\le x \le 2\pi,\ 0\le y \le 2\pi

境界条件は $x, y$ 各方向に周期境界条件を課します。
今回はこれを解くのが目標です。最終的にはこんな図が出来上がります。

orszag_tang_p.png

HLLD近似リーマン解法

ここで一旦簡単のために1次元とします。このとき方程式系は以下のように書けます。

\frac{\partial\mathbf{U}}{\partial t} + \frac{\partial\mathbf{F}}{\partial x} = \mathbf{0}
\mathbf{U} = \begin{bmatrix}
\rho \\
\rho u \\
\rho v \\
\rho w \\
B_y \\
B_z \\
e
\end{bmatrix},\ 
\mathbf{F} = \begin{bmatrix}
\rho u \\
\rho u^2+p_T-B_x^2 \\
\rho uv-B_xB_y \\
\rho uw-B_xB_z \\
B_yu-B_xv \\
B_zu-B_xw \\
\left(e+p_T\right)u-B_x\left(\mathbf{v}\cdot\mathbf{B}\right)
\end{bmatrix}

1次元の場合マクスウェル方程式の

\nabla\cdot\mathbf{B} = 0

から $B_x$ は時間・空間的に常に一定となるので、$B_x$ は方程式の独立変数から外してあります。

さて、上の $\mathbf{F}$ のヤコビアンの固有値を求めることで、この磁気流体には7個の特性速度

\lambda_1 = u-c_{fx}, \lambda_2 = u-c_{ax}, \lambda_3 = u-c_{sx}, \lambda_4 = u, \lambda_5 = u+c_{sx}, \lambda_6 = u+c_{ax},\lambda_7 = u+c_{fx}

があることがわかります (導出はめちゃくちゃ面倒なので省略します)。ここで

a = \sqrt{\frac{\gamma p}{\rho}},\ c_{a} = \frac{|\mathbf{B}|}{\sqrt{\rho}}
c_{ax} = \frac{|B_x|}{\sqrt{\rho}},\ c_{fx,sx} = \left[\frac{a^2+c_a^2\pm\sqrt{\left(a^2+c_a^2\right)^2-4a^2c_{ax}^2}}{2}\right]^{1/2}

で、 $\lambda_2, \lambda_6$ はAlfvén波、$\lambda_1, \lambda_7$ は速進磁気音波 (fast magnetosonic wave)、$\lambda_3, \lambda_5$ は遅進磁気音波 (slow magnetosonic wave)、そして $\lambda_4$ はエントロピー波を表しています。
通常の流体が前後の音波とエントロピー波の合計3種類だったことを考えると、随分波の種類が増えています。うーんこれは大変そう。
何しろ多くの場合無視される遅進磁気音波抜きにしても5個です。これら多種多様な波を的確に捉えて計算するのは、通常の流体と比べてもかなり難易度が高くなります。

そんななか、HLLD近似リーマン解法 (Miyoshi and Kusano, 2005) は精度・計算効率・ロバスト性いずれも優れた解法として広く用いられており、今回はこれを採用したいと思います。

ここで以下のような原点を境に不連続に配置された初期値問題を考えます。

\mathbf{U} = \begin{cases}
\mathbf{U}_L & (x > 0) \\
\mathbf{U}_R & (x < 0)
\end{cases}

これをリーマン問題といい、HLLD法をはじめとする近似リーマン解法系のスキームでは、各セル内では $\mathbf{U}$ が一様になっていると近似して、各セルの境界毎のリーマン問題に帰着させます (下図)。

fig01.png

そこでHLLD法の説明に入る前に、より単純なHLL法を説明します (後でこの結果を使います)。リーマン問題の解は中心からさまざまな波が広がってゆく、という形になりますが、HLL法では左向きの波 $S_L$ と右向きの波 $S_R$ の2種類のみを考えます。さらに2つの波に囲まれた領域 (リーマンファン) 内で $\mathbf{U}$ は一定と近似します。
この2つの波には左右に一番速い速進磁気音波を選び、その速さは

S_L = \mathrm{min}\left(u_L-{c_{fx}}_L, u_R-{c_{fx}}_R\right), S_R = \mathrm{max}\left(u_L+{c_{fx}}_L, u_R+{c_{fx}}_R\right)

または

S_L = \mathrm{min}\left(u_L, u_R\right)-\mathrm{max}\left({c_{fx}}_L, {c_{fx}}_R\right), S_R = \mathrm{max}\left(u_L, u_R\right)+\mathrm{max}\left({c_{fx}}_L, {c_{fx}}_R\right)

などと求めます。
方程式を $x-t$ 平面内でグリーンの定理を使って積分すると

\iint_S\left(\frac{\partial\mathbf{U}}{\partial t}+\frac{\partial\mathbf{F}}{\partial x}\right)dxdt = \oint_c\left(-\mathbf{U}dx+\mathbf{F}dt\right) = \mathbf{0}

となるので、下図積分路①について、

-S_R\mathbf{U}_R+S_L\mathbf{U}_L+\mathbf{F}_R-\mathbf{F}_L+\left(S_R-S_L\right)\mathbf
{U}^* = \mathbf{0}

積分路②について、

-S_R\mathbf{U}_R+\mathbf{F}_R-\mathbf{F}_{HLL}+S_R\mathbf
{U}^* = \mathbf{0}

したがって、中間状態の $\mathbf{U}^*$ と $\mathbf{F}_{HLL}$ は、

\begin{aligned}
\mathbf{U}^* &= \frac{S_R\mathbf{U}_R-S_L\mathbf{U}_L-\mathbf{F}_R+\mathbf{F}_L}{S_R-S_L} \\
\mathbf{F}_{HLL} &= \mathbf{F}_R+S_R\left(\mathbf{U}^*-\mathbf{U}_R\right) \\
&= \frac{S_R\mathbf{F}_L-S_L\mathbf{F}_R+S_R S_L\left(\mathbf{U}_R-\mathbf{U}_L\right)}{S_R-S_L}
\end{aligned}

と得られます。

ここでは一旦 $S_L < 0 < S_R$ の場合を考えていますが、もし $S_L > 0$ の場合には $\mathbf{F}_L$ を、$S_R < 0$ の場合には $\mathbf{F}_R$ をそれぞれセル境界でのフラックスとすればOKです。

fig02.png

次に以上の方法を拡張してHLLD法を導出してみましょう。HLLD法ではリーマンファン内で $u$ が一定と仮定します。これによって $p_T$ も一定となり、遅進磁気音波が除外されます。なのでこの場合リーマン問題の解は、下図のように

  • 速進磁気音波より速い領域 × 2
  • 速進磁気音波とAlfvén波の間の領域 × 2
  • Alfvén波とエントロピー波の間の領域 × 2

の6領域に分かれます。

fig03.png

ここから領域内の $\mathbf{U}$ を次々求めてゆきましょう。
まず前提から以下が成り立ちます。

u^*_L = u^{**}_L = u^{**}_R = u^*_R \equiv S_M
{p_T}^*_L = {p_T}^{**}_L = {p_T}^{**}_R = {p_T}^*_R \equiv {p_T}^*

$S_M$ はHLL法で得られた $\rho_{HLL}$ と $\left(\rho u\right)_{HLL}$ を使って以下のように求めます。

S_M = \frac{\left(\rho u\right)_{HLL}}{\rho_{HLL}} = \frac{\left(S_R-u_R\right)\rho_R u_R-\left(S_L-u_L\right)\rho_L u_L-{p_T}_R+{p_T}_L}{\left(S_R-u_R\right)\rho_R-\left(S_L-u_L\right)\rho_L}

速進磁気音波に対するジャンプ条件

まず速進磁気音波の前後で値がどのように変わるか見てみましょう。
そのために積分路①を下図のように半分ずつ $\mathbf{U}_R$ の領域と $\mathbf{U}^*_R$ の領域が入るようにとります。

fig04.png

このとき

S_R\mathbf{U}_R-\mathbf{F}\left(\mathbf{U}_R\right) = S_R\mathbf{U}^*_R-\mathbf{F}\left(\mathbf{U}^*_R\right)

で、これを具体的に成分で書くと以下のようになります。

S_R\begin{bmatrix}
\rho_R \\
\rho_R u_R \\
\rho_R v_R \\
\rho_R w_R \\
{B_y}_R \\
{B_z}_R \\
e_R
\end{bmatrix}
-
\begin{bmatrix}
\rho_R u_R \\
\rho_R u_R^2+{p_T}_R-B_x^2 \\
\rho_R u_R v_R-B_x{B_y}_R \\
\rho_R u_R w_R-B_x{B_z}_R \\
{B_y}_R u_R-B_x v_R \\
{B_z}_R u_R-B_x w_R \\
\left(e_R+{p_T}_R\right)u_R-B_x \left(\mathbf{v}_R\cdot\mathbf{B}_R\right)
\end{bmatrix}
=
S_R\begin{bmatrix}
\rho^*_R \\
\rho^*_R S_M \\
\rho^*_R v^*_R \\
\rho^*_R w^*_R \\
{B_y}^*_R \\
{B_z}^*_R \\
e^*_R
\end{bmatrix}
-
\begin{bmatrix}
\rho^*_R S_M \\
\rho^*_R S_M^2+{p_T}^*-B_x^2 \\
\rho^*_R S_M v^*_R-B_x{B_y}^*_R \\
\rho^*_R S_M w^*_R-B_x{B_z}^*_R \\
{B_y}^*_R S_M-B_x v^*_R \\
{B_z}^*_R S_M-B_x w^*_R \\
\left(e^*_R+{p_T}^*\right)S_M-B_x \left(\mathbf{v}^*_R\cdot\mathbf{B}^*_R\right)
\end{bmatrix}

左側についても同様に考えることで、ここから $\mathbf{U}^*_\alpha$ が以下のように求められます。$\alpha$ はLまたはRという意味です。

\begin{aligned}
\rho^*_\alpha &= \rho_\alpha\frac{S_\alpha-u_\alpha}{S_\alpha-S_M} \\
v^*_\alpha &= v_\alpha-B_x {B_y}_\alpha\frac{S_M-u_\alpha}{\rho_\alpha\left(S_\alpha-u_\alpha\right)\left(S_\alpha-S_M\right)-B_x^2} \\
w^*_\alpha &= w_\alpha-B_x {B_z}_\alpha\frac{S_M-u_\alpha}{\rho_\alpha\left(S_\alpha-u_\alpha\right)\left(S_\alpha-S_M\right)-B_x^2} \\
{B_y}^*_\alpha &= {B_y}_\alpha\frac{\rho_\alpha\left(S_\alpha-u_\alpha\right)^2-B_x^2}{\rho_\alpha\left(S_\alpha-u_\alpha\right)\left(S_\alpha-S_M\right)-B_x^2} \\
{B_z}^*_\alpha &= {B_z}_\alpha\frac{\rho_\alpha\left(S-u_\alpha\right)^2-B_x^2}{\rho_\alpha\left(S_\alpha-u_\alpha\right)\left(S_\alpha-S_M\right)-B_x^2}
\end{aligned}
\begin{aligned}
p_T^* &= {p_T}_L+\rho_L\left(S_L-u_L\right)\left(S_M-u_L\right) \\
&= {p_T}_R+\rho_R\left(S_R-u_R\right)\left(S_M-u_R\right) \\
&= \frac{\left(S_R-u_R\right)\rho_R{p_T}_L-\left(S_L-u_L\right)\rho_L{p_T}_R+\rho_L\rho_R\left(S_R-u_R\right)\left(S_L-u_L\right)\left(u_R-u_L\right)}{\left(S_R-u_R\right)\rho_R-\left(S_L-u_L\right)\rho_L}
\end{aligned}
e^*_\alpha = \frac{\left(S_\alpha-u_\alpha\right)e_\alpha-{p_T}_\alpha u_\alpha+p_T^*S_M+B_x\left(\mathbf{v}_\alpha\cdot\mathbf{B}_\alpha-\mathbf{v}^*_\alpha\cdot\mathbf{B}^*_\alpha\right)}{S_\alpha-S_M}

Alfvén波に対するジャンプ条件

次にAlfvén波の前後の変化を見るために、積分路②を下図のように半分ずつ $\mathbf{U}^*_R$ の領域と $\mathbf{U}^{**}_R$ の領域が入るようにとります。

fig05.png

ここで $S^{*}_L$ と $S^{*}_R$ は

S^*_L = S_M-\frac{|B_x|}{\sqrt{\rho^*_L}},\ S^*_R = S_M+\frac{|B_x|}{\sqrt{\rho^*_R}}

ととります。積分路①の場合と同様にして、

S^*_R\mathbf{U}^*_R-\mathbf{F}\left(\mathbf{U}^*_R\right) = S^*_R\mathbf{U}^{**}_R-\mathbf{F}\left(\mathbf{U}^{**}_R\right)
S^*_R\begin{bmatrix}
\rho^*_R \\
\rho^*_R S_M \\
\rho^*_R v^*_R \\
\rho^*_R w^*_R \\
{B_y}^*_R \\
{B_z}^*_R \\
e^*_R
\end{bmatrix}
-
\begin{bmatrix}
\rho^*_R S_M \\
\rho^*_R S_M^2+{p_T}^*-B_x^2 \\
\rho^*_R S_M v^*_R-B_x{B_y}^*_R \\
\rho^*_R S_M w^*_R-B_x{B_z}^*_R \\
{B_y}^*_R S_M-B_x v^*_R \\
{B_z}^*_R S_M-B_x w^*_R \\
\left(e^*_R+{p_T}^*\right)S_M-B_x \left(\mathbf{v}^*_R\cdot\mathbf{B}^*_R\right)
\end{bmatrix}
=
S^*_R\begin{bmatrix}
\rho^{**}_R \\
\rho^{**}_R S_M \\
\rho^{**}_R v^{**}_R \\
\rho^{**}_R w^{**}_R \\
{B_y}^{**}_R \\
{B_z}^{**}_R \\
e^{**}_R
\end{bmatrix}
-
\begin{bmatrix}
\rho^{**}_R S_M \\
\rho^{**}_R S_M^2+{p_T}^*-B_x^2 \\
\rho^{**}_R S_M v^{**}_R-B_x{B_y}^{**}_R \\
\rho^{**}_R S_M w^{**}_R-B_x{B_z}^{**}_R \\
{B_y}^{**}_R S_M-B_x v^{**}_R \\
{B_z}^{**}_R S_M-B_x w^{**}_R \\
\left(e^{**}_R+{p_T}^*\right)S_M-B_x \left(\mathbf{v}^{**}_R\cdot\mathbf{B}^{**}_R\right)
\end{bmatrix}

となります。ここからわかるのは以下の $\rho^{**}_\alpha$ だけです。

\rho^{**}_\alpha = \rho^*_\alpha

エントロピー波に対するジャンプ条件

さらに下図積分路③を考えます。

fig06.png

S_M\mathbf{U}^{**}_L-\mathbf{F}\left(\mathbf{U}^{**}_L\right) = S_R\mathbf{U}^{**}_R-\mathbf{F}\left(\mathbf{U}^{**}_R\right)
S_M\begin{bmatrix}
\rho^*_L \\
\rho^*_L S_M \\
\rho^*_L v^{**}_L \\
\rho^*_L w^{**}_L \\
{B_y}^{**}_L \\
{B_z}^{**}_L \\
e^{**}_L
\end{bmatrix}
-
\begin{bmatrix}
\rho^*_L S_M \\
\rho^*_L S_M^2+{p_T}^*-B_x^2 \\
\rho^*_L S_M v^{**}_L-B_x{B_y}^{**}_L \\
\rho^*_L S_M w^{**}_L-B_x{B_z}^{**}_L \\
{B_y}^{**}_L S_M-B_x v^{**}_L \\
{B_z}^{**}_L S_M-B_x w^{**}_L \\
\left(e^{**}_L+{p_T}^*\right)S_M-B_x \left(\mathbf{v}^{**}_L\cdot\mathbf{B}^{**}_L\right)
\end{bmatrix}

=
S_M\begin{bmatrix}
\rho^*_R \\
\rho^*_R S_M \\
\rho^*_R v^{**}_R \\
\rho^*_R w^{**}_R \\
{B_y}^{**}_R \\
{B_z}^{**}_R \\
e^{**}_R
\end{bmatrix}
-
\begin{bmatrix}
\rho^*_R S_M \\
\rho^*_R S_M^2+{p_T}^*-B_x^2 \\
\rho^*_R S_M v^{**}_R-B_x{B_y}^{**}_R \\
\rho^*_R S_M w^{**}_R-B_x{B_z}^{**}_R \\
{B_y}^{**}_R S_M-B_x v^{**}_R \\
{B_z}^{**}_R S_M-B_x w^{**}_R \\
\left(e^{**}_R+{p_T}^*\right)S_M-B_x \left(\mathbf{v}^{**}_R\cdot\mathbf{B}^{**}_R\right)
\end{bmatrix}

ここから以下のように $v^{**}$, $w^{**}$, ${B_y}^{**}$, ${B_z}^{**}$ が左右で等しいことが示されます。

\begin{aligned}
v^{**}_L &= v^{**}_R \equiv v^{**} \\
w^{**}_L &= w^{**}_R \equiv w^{**} \\
{B_y}^{**}_L &= {B_y}^{**}_R \equiv {B_y}^{**} \\
{B_z}^{**}_L &= {B_z}^{**}_R \equiv {B_z}^{**}
\end{aligned}

これを踏まえ、今度は下図積分路④で積分します。

fig07.png

\begin{aligned}
\mathbf{0} &= -S_R\mathbf{U}_R+S_L\mathbf{U}_L+\mathbf{F}_R-\mathbf{F}_L+\left(S_R-S^*_R\right)\mathbf{U}^*_R+\left(S^*_L-S_L\right)\mathbf{U}^*_L+\left(S^*_R-S_M\right)\mathbf{U}^{**}_R+\left(S_M-S^*_L\right)\mathbf{U}^{**}_L \\
&= -S^*_R\mathbf{U}^*_R+S^*_L\mathbf{U}^*_L+\mathbf{F}^*_R-\mathbf{F}^*_L+\left(S^*_R-S_M\right)\mathbf{U}^{**}_R+\left(S_M-S^*_L\right)\mathbf{U}^{**}_L \\
&= -\left(S_M+\frac{|B_x|}{\sqrt{\rho^*_R}}\right)\mathbf{U}^*_R+\left(S_M-\frac{|B_x|}{\sqrt{\rho^*_L}}\right)\mathbf{U}^*_L+\mathbf{F}^*_R-\mathbf{F}^*_L+\frac{|B_x|}{\sqrt{\rho^*_R}}\mathbf{U}^{**}_R+\frac{|B_x|}{\sqrt{\rho^*_L}}\mathbf{U}^{**}_L
\end{aligned}

ここから、

\begin{aligned}
v^{**} &= \frac{\sqrt{\rho^*_L}v^*_L+\sqrt{\rho^*_R}v^*_R+\left({B_y}^*_R-{B_y}^*_L\right)\mathrm{sign}\left(B_x\right)}{\sqrt{\rho^*_L}+\sqrt{\rho^*_R}} \\
w^{**} &= \frac{\sqrt{\rho^*_L}w^*_L+\sqrt{\rho^*_R}w^*_R+\left({B_z}^*_R-{B_z}^*_L\right)\mathrm{sign}\left(B_x\right)}{\sqrt{\rho^*_L}+\sqrt{\rho^*_R}} \\
B_y^{**} &= \frac{\sqrt{\rho^*_L}{B_y}^*_R+\sqrt{\rho^*_R}{B_y}^*_L+\sqrt{\rho^*_L\rho^*_R}\left(v^*_R-v^*_L\right)\mathrm{sign}\left(B_x\right)}{\sqrt{\rho^*_L}+\sqrt{\rho^*_R}} \\
B_z^{**} &= \frac{\sqrt{\rho^*_L}{B_z}^*_R+\sqrt{\rho^*_R}{B_z}^*_L+\sqrt{\rho^*_L\rho^*_R}\left(w^*_R-w^*_L\right)\mathrm{sign}\left(B_x\right)}{\sqrt{\rho^*_L}+\sqrt{\rho^*_R}}
\end{aligned}

最後にAlfvén波に対するジャンプ条件をもう一度使うことで、以下が得られます。

e^{**}_\alpha = e^*_\alpha\mp\sqrt{\rho^*_\alpha}\left(\mathbf{v}^*_\alpha\cdot\mathbf{B}^*_\alpha-\mathbf{v}^{**}\cdot\mathbf{B}^{**}\right)\mathrm{sign}\left(B_x\right)

複号はLのときマイナス、Rのときプラスをとります。

長くなりましたが、これで各領域の $\mathbf{U}$ がすべて求まりました。
以上をまとめますと最終的に以下の式になります。

\mathbf{F}_{HLLD} = \begin{cases}
\mathbf{F}_L & (0 < S_L) \\
\mathbf{F}_L^* & (S_L \le 0 < S_L^*) \\
\mathbf{F}_L^{**} & (S_L^* \le 0 < S_M) \\
\mathbf{F}_R^{**} & (S_M \le 0 < S_R^*) \\
\mathbf{F}_R^* & (S_R^* \le 0 < S_R) \\
\mathbf{F}_R & (S_R < 0)
\end{cases}

多次元化

多次元化を行うためには、単にHLLD法を $x,y,z$ 方向それぞれに適用すればよさそうな気がしますが、実際にはそう単純にはいかないところが磁気流体計算の難しさの一つです。
ここで問題になるのが磁場の発散です。本来ならマクスウェル方程式から磁場は常に非発散、つまり

\nabla\cdot\mathbf{B} = 0

でなければなりませんが、数値計算の場合誤差によってこれが破れることがあり、時には計算を破綻させてしまうほどの悪影響を及ぼします。これを回避する方法としては、大きく分けて

  • 配置等を工夫することで磁場発散を初めから発生させない方法
  • 方程式に変更を加えることで発生した磁場発散を抑制する方法

の2種類があります。前者はCT法が有名です。後者ではプロジェクション法や9 wave法が知られており、ここでは実装の比較的単純な9 wave法を紹介します。

9 wave法

Dedner et al. (2002) で提案された方法で、$\psi$ という新しい変数を導入して以下のように方程式を拡張します。

\frac{\partial\mathbf{U}}{\partial t} + \frac{\partial\mathbf{F}}{\partial x} + \frac{\partial\mathbf{G}}{\partial y} + \frac{\partial\mathbf{H}}{\partial z} = \mathbf{S}
\mathbf{U} = \begin{bmatrix}
\rho \\
\rho u \\
\rho v \\
\rho w \\
B_x \\
B_y \\
B_z \\
e \\
\psi
\end{bmatrix},\ 
\mathbf{F} = \begin{bmatrix}
\rho u \\
\rho u^2+p_T-B_x^2 \\
\rho uv-B_xB_y \\
\rho uw-B_xB_z \\
\psi \\
B_yu-B_xv \\
B_zu-B_xw \\
\left(e+p_T\right)u-B_x\left(\mathbf{v}\cdot\mathbf{B}\right) \\
c_h^2 B_x
\end{bmatrix},\ 
\mathbf{G} = \begin{bmatrix}
\rho v \\
\rho vu-B_yB_x \\
\rho v^2+p_T-B_y^2 \\
\rho vw-B_yB_z \\
B_xv-B_yu \\
\psi \\
B_zv-B_yw \\
\left(e+p_T\right)v-B_y\left(\mathbf{v}\cdot\mathbf{B}\right) \\
c_h^2 B_y
\end{bmatrix},\ 
\mathbf{H} = \begin{bmatrix}
\rho w \\
\rho wu-B_zB_x \\
\rho wv-B_zB_y \\
\rho w^2+p_T-B_z^2 \\
B_xw-B_zu \\
B_yw-B_zv \\
\psi \\
\left(e+p_T\right)w-B_z\left(\mathbf{v}\cdot\mathbf{B}\right) \\
c_h^2 B_z
\end{bmatrix},\ 
\mathbf{S} = \begin{bmatrix}
0 \\
0 \\
0 \\
0 \\
0 \\
0 \\
0 \\
0 \\
-\frac{c_h^2}{c_p^2}\psi
\end{bmatrix}

この $\psi$ は磁場発散を抑制するために人工的に導入された変数なので、何か物理的な意味があるわけではありません。

セル境界での $B_x$ と $\psi$ は以下のようにして求めます。

\begin{aligned}
{B_x}_M &= {B_x}_L+\frac{1}{2}\left({B_x}_R-{B_x}_L\right)-\frac{1}{2c_h}\left(\psi_R-\psi_L\right) \\
\psi_M &= \psi_L+\frac{1}{2}\left(\psi_R-\psi_L\right)-\frac{c_h}{2}\left({B_x}_R-{B_x}_L\right)
\end{aligned}

実際の計算においては $\psi$ のソース項をそのまま計算するのではなく、一旦ソース項を忘れて他変数と同様にフラックスの差から $\psi^*$ を求めた後、以下のようにして $\psi^{n+1}$ を求めるのが一般的です。

\psi^{n+1} = \psi^*\exp\left(-\Delta t_n\frac{c_h^2}{c_p^2}\right)

なおここで出てくる $c_h$ と $c_p$ は、$\nabla\cdot\mathbf{B}$ および $\psi$ に対する人工的な伝播と減衰の速さを表すフリーパラメータになります。
$c_h$ としてはCFL条件によって許される最大速度、

c_h = \mathrm{CFL}\cdot\mathrm{min}\left(\Delta x, \Delta y, \Delta z\right)/\Delta t_n

$c_p$ としては $c_r = c_p^2/c_h$ が定数となるようにした上で、$c_r = 0.18$ を選ぶのが空間解像度によらず最適となるようです。

高精度化

$\mathbf{F}_{i+1/2}$ を求める際、$\mathbf{U}_L$ として $\mathbf{U}_i$ を、$\mathbf{U}_R$ として $\mathbf{U}_{i+1}$ を選ぶのはいわゆる風上差分に相当し、1次の空間精度になります。
後で図をお見せしますが、実はこれ、数値拡散が大きくかなりぼやけた解になります。そこでここでは空間精度を上げる方法を紹介します。
また時間精度についても、常微分方程式でRunge-Kutta法を用いることで精度が上げられたように、偏微分方程式でも同様のことを考えることができます。

(そろそろ長くなって疲れてきました。。。)

空間の高精度化

${\mathbf{U}_{i+1/2}}_L$ として例えば

{\mathbf{U}_{i+1/2}}_L = \frac{-\mathbf{U}_{i-1}+4\mathbf{U}_i+\mathbf{U}_{i+1}}{4}

を使うようにすれば空間2次精度が得られますが、このように $\mathbf{U}$ を単純に線形に組み合わせるだけだと数値振動が発生してしまいあまり好ましくありません。
そこでこれを回避するために様々な非線形スキームが考案されています。ここでは2次精度のMUSCLと5次精度のMP5を簡単に紹介します。

MUSCL

van Leer (1979) で提案された手法で、以下のようにリミッターを導入することで基本は2次精度を保ちつつ、不連続面では数値振動が発生しないように1次精度に落とすことができます。

\begin{aligned}
{\mathbf{U}_{i+1/2}}_L &= \mathbf{U}_i+\frac{1}{2}\Delta_i \\
{\mathbf{U}_{i-1/2}}_R &= \mathbf{U}_i-\frac{1}{2}\Delta_i
\end{aligned}
\Delta_i = \mathrm{limiter}\left(\mathbf{U}_{i+1}-\mathbf{U}_i, \mathbf{U}_i-\mathbf{U}_{i-1}\right)

リミッターとしては以下のminmod (Roe, 1986) やMC (van Leer, 1977) がよく使われるようです。

\begin{aligned}
\mathrm{minmod}\left(a, b\right) &= \frac{1}{2}\left[\mathrm{sign}\left(a\right)+\mathrm{sign}\left(b\right)\right]\mathrm{min}\left(|a|, |b|\right) \\
\mathrm{MC}\left(a, b\right) &= \frac{1}{2}\left[\mathrm{sign}\left(a\right)+\mathrm{sign}\left(b\right)\right]\mathrm{min}\left(2|a|, \frac{|a+b|}{2}, 2|b|\right)
\end{aligned}

MP5

Suresh and Huynh (1997) による方法です。まず ${\mathbf{U}_{i+1/2}}_L$ を5次精度の線形スキームで補間します。

{\mathbf{U}_{i+1/2}}^*_L = \frac{2\mathbf{U}_{i-2}-13\mathbf{U}_{i-1}+4\mathbf{U}_i+47\mathbf{U}_{i+1}-3\mathbf{U}_{i+2}}{60}

もちろんこのままでは数値振動が発生してしまうため、ある下限値 $\mathbf{U}_{\mathrm{min}}$ と上限値 $\mathbf{U}_{\mathrm{max}}$ を定め、それらの中央値を使って ${\mathbf{U}_{i+1/2}}_L$ を求めます。

{\mathbf{U}_{i+1/2}}_L = \mathrm{median}\left({\mathbf{U}_{i+1/2}}^*_L, \mathbf{U}_{\mathrm{min}}, \mathbf{U}_{\mathrm{max}}\right)

具体的な $\mathbf{U}_{\mathrm{min}}$ と $\mathbf{U}_{\mathrm{max}}$ の求め方は結構長いのでここでは省略します。

時間の高精度化

\frac{d\mathbf{U}}{dt} = L\left(\mathbf{U}\right)

と書けるとき、以下のように解くのはいわゆるオイラー法に相当し、1次の時間精度になります。

\mathbf{U}^{n+1} = \mathbf{U}^n+\Delta tL\left(\mathbf{U}^n\right)

しかしこれもRunge-Kutta法を使えば精度を上げることができます。
今回はRunge-Kutta法のうちでも、特に余分な数値振動を発生させないという意味で性質の良いSSPRK (strong stability preserving Runge-Kutta) シリーズ (Shu and Osher, 1988) を載せておきます。

2次精度 SSPRK (2, 2)

\begin{aligned}
\mathbf{U}^{\left(1\right)} &= \mathbf{U}^n+\Delta tL\left(\mathbf{U}^n\right) \\
\mathbf{U}^{n+1} &= \frac{1}{2}\mathbf{U}^n+\frac{1}{2}\mathbf{U}^{\left(1\right)}+\frac{1}{2}\Delta tL\left(\mathbf{U}^{\left(1\right)}\right)
\end{aligned}

3次精度 SSPRK (3, 3)

\begin{aligned}
\mathbf{U}^{\left(1\right)} &= \mathbf{U}^n+\Delta tL\left(\mathbf{U}^n\right) \\
\mathbf{U}^{\left(2\right)} &= \frac{3}{4}\mathbf{U}^n+\frac{1}{4}\mathbf{U}^{\left(1\right)}+\frac{1}{4}\Delta tL\left(\mathbf{U}^{\left(1\right)}\right) \\
\mathbf{U}^{n+1} &= \frac{1}{3}\mathbf{U}^n+\frac{2}{3}\mathbf{U}^{\left(2\right)}+\frac{2}{3}\Delta tL\left(\mathbf{U}^{\left(2\right)}\right)
\end{aligned}

結果

以上を踏まえて、MP5およびSSPRK (3, 3)を用いて計算した結果が以下になります。以下はすべて分割数 $N_x\times N_y = 200\times 200$, $\mathrm{CFL} = 0.4$ で計算しています。

orszag_tang_p.gif

上は圧力のプロットです。折り畳んだところを開けばすべての変数が見られます。


動画
  • 圧力
    orszag_tang_p.gif

  • 密度
    orszag_tang_r.gif

  • $u$
    orszag_tang_u.gif

  • $v$
    orszag_tang_v.gif

  • $B_x$
    orszag_tang_bx.gif

  • $B_y$
    orszag_tang_by.gif

  • $\psi$
    orszag_tang_ps.gif



画像
  • 圧力
    orszag_tang_p.png

  • 密度
    orszag_tang_r.png

  • $u$
    orszag_tang_u.png

  • $v$
    orszag_tang_v.png

  • $B_x$
    orszag_tang_bx.png

  • $B_y$
    orszag_tang_by.png

  • $\psi$
    orszag_tang_ps.png

良い感じ!

スキームによる違い

せっかくなのでスキームによる違いも見てみましょう。以下の3種類を比較してみます。

  • 風上差分・オイラー法 (空間1次・時間1次)
  • MUSCL-minmod・SSPRK (2, 2) (空間2次・時間2次)
  • MP5・SSPRK (3, 3) (空間5次・時間3次)

fig08.png

こうしてみると歴然とした違いがありますね。特に風上差分・オイラー法は同じ空間解像度のはずなのに随分ぼやけてしまっています。

磁場発散処理の有無による違い

次に磁場発散処理を入れなかったらどうなるか見てみます。
両者の違いが一番見やすかったMUSCL-minmod・SSPRK (2, 2)で比較します。

...

...

磁場発散処理を入れないと破綻してしまいました。。。

とりあえず計算が壊れる直前の図を見比べてみます。

fig09.png

左にはいかにもヤバそうな、怪しい縞模様が見えますね。
$|\nabla\cdot\mathbf{B}|$ をプロットしてみると、ちょうどそこで大きな磁場発散が生じてしまっていることがわかります。一方で右ではうまく抑えられていますね。

fig10.png

ソースコード

現時点でC++とPythonの2バージョンを作成しており、GutHubにて公開しています。
C++バージョンではOpenMPによる並列化を施しているので、$N_x\times N_y = 200\times 200$ のような大きなサイズの計算に適しています。なお、constexpr if文構造化束縛を使っているのでコンパイルにはC++17以上が必要です。
一方Pythonバージョンではその場ですぐプロットするように作ったので、サクッと結果を見たい場合はこちらが良いかと思います。Pythonといってもfor文を排除してできるだけNumPyに計算を任せるようにしているので、そこそこのパフォーマンスになってくれる、はず。。。
長くなるのでここではPythonバージョンのみ載せておきます。


Pythonバージョンのソースコード
main.py
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm

# グリッド数
#XN = 200; YN = 200
XN = 100; YN = 100
# ファイル出力回数
PN = 100
# 保存変数の個数
VN = 9
# 計算領域のサイズ
XL = 2.0*np.pi; YL = 2.0*np.pi
TL = np.pi
# グリッドサイズ
dx = XL/XN; dy = YL/YN
# CFL数
CFL = 0.4
# 比熱比
gam = 5.0/3.0
# 磁場発散抑制のためのパラメータ (Dedner et al., 2002)
cr = 0.18

# MINMOD関数
def minmod(a, b):
    return 0.5*(np.sign(a)+np.sign(b))*np.minimum(np.abs(a), np.abs(b))
# 中央値
def median(a, b, c):
    return a+minmod(b-a, c-a)
# x方向フラックス
def mhdfx(rho, u, v, w, bx, by, bz, e, psi, pt, ch):
    fx = np.zeros((XN, YN, VN))
    fx[:, :, 0] = rho*u
    fx[:, :, 1] = rho*u*u+pt-bx*bx
    fx[:, :, 2] = rho*v*u-bx*by
    fx[:, :, 3] = rho*w*u-bx*bz
    fx[:, :, 4] = psi
    fx[:, :, 5] = by*u-bx*v
    fx[:, :, 6] = bz*u-bx*w
    fx[:, :, 7] = (e+pt)*u-bx*(u*bx+v*by+w*bz)
    fx[:, :, 8] = ch*ch*bx
    return fx
# y方向フラックス
def mhdfy(rho, u, v, w, bx, by, bz, e, psi, pt, ch):
    fy = np.zeros((XN, YN, VN))
    fy[:, :, 0] = rho*v
    fy[:, :, 1] = rho*u*v-by*bx
    fy[:, :, 2] = rho*v*v+pt-by*by
    fy[:, :, 3] = rho*w*v-by*bz
    fy[:, :, 4] = bx*v-by*u
    fy[:, :, 5] = psi
    fy[:, :, 6] = bz*v-by*w
    fy[:, :, 7] = (e+pt)*v-by*(u*bx+v*by+w*bz)
    fy[:, :, 8] = ch*ch*by
    return fy
# HLLDリーマンソルバ (Miyoshi and Kusano, 2005)
def hlldx(ql, qr, ch):
    # 密度
    rhol = ql[:, :, 0]
    rhor = qr[:, :, 0]
    # 運動量
    mxl = ql[:, :, 1]
    myl = ql[:, :, 2]
    mzl = ql[:, :, 3]
    mxr = qr[:, :, 1]
    myr = qr[:, :, 2]
    mzr = qr[:, :, 3]
    # 磁場
    bxl = ql[:, :, 4]
    byl = ql[:, :, 5]
    bzl = ql[:, :, 6]
    bxr = qr[:, :, 4]
    byr = qr[:, :, 5]
    bzr = qr[:, :, 6]
    # エネルギー
    el = ql[:, :, 7]
    er = qr[:, :, 7]
    # 磁場発散抑制のための人工ポテンシャル (Dedner et al., 2002)
    psil = ql[:, :, 8]
    psir = qr[:, :, 8]
    # 速度
    ul = mxl/rhol
    vl = myl/rhol
    wl = mzl/rhol
    ur = mxr/rhor
    vr = myr/rhor
    wr = mzr/rhor
    # (熱的) 圧力
    pl = (gam-1.0)*(el-0.5*rhol*(ul*ul+vl*vl+wl*wl)-0.5*(bxl*bxl+byl*byl+bzl*bzl))
    pr = (gam-1.0)*(er-0.5*rhor*(ur*ur+vr*vr+wr*wr)-0.5*(bxr*bxr+byr*byr+bzr*bzr))
    ptl = pl+0.5*(bxl*bxl+byl*byl+bzl*bzl)
    # 総圧力 (熱的圧力 + 磁気圧)
    ptr = pr+0.5*(bxr*bxr+byr*byr+bzr*bzr)
    # 音速
    al = np.sqrt(gam*pl/rhol)
    ar = np.sqrt(gam*pr/rhor)
    # Alfvén速度
    cal = np.sqrt((bxl*bxl+byl*byl+bzl*bzl)/rhol)
    car = np.sqrt((bxr*bxr+byr*byr+bzr*bzr)/rhor)
    caxl = np.sqrt(bxl*bxl/rhol)
    caxr = np.sqrt(bxr*bxr/rhor)
    # 速進磁気音波速度
    cfl = np.sqrt(0.5*(al*al+cal*cal+np.sqrt((al*al+cal*cal)**2-4.0*(al*caxl)**2)))
    cfr = np.sqrt(0.5*(ar*ar+car*car+np.sqrt((ar*ar+car*car)**2-4.0*(ar*caxr)**2)))
    sfl = np.minimum(ul-cfl, ur-cfr)
    sfr = np.maximum(ul+cfl, ur+cfr)
    sm = ((sfr-ur)*rhor*ur-(sfl-ul)*rhol*ul-ptr+ptl)/((sfr-ur)*rhor-(sfl-ul)*rhol)
    um = sm
    bxm = bxl+0.5*(bxr-bxl)-0.5/ch*(psir-psil)
    psim = psil+0.5*(psir-psil)-0.5*ch*(bxr-bxl)
    # リーマンファン外側
    ptm = ((sfr-ur)*rhor*ptl-(sfl-ul)*rhol*ptr+rhol*rhor*(sfr-ur)*(sfl-ul)*(ur-ul))/((sfr-ur)*rhor-(sfl-ul)*rhol)
    rhoml = rhol*(sfl-ul)/(sfl-sm)
    rhomr = rhor*(sfr-ur)/(sfr-sm)
    vol = vl-bxm*byl*(sm-ul)/(rhol*(sfl-ul)*(sfl-sm)-bxm*bxm)
    vor = vr-bxm*byr*(sm-ur)/(rhor*(sfr-ur)*(sfr-sm)-bxm*bxm)
    wol = wl-bxm*bzl*(sm-ul)/(rhol*(sfl-ul)*(sfl-sm)-bxm*bxm)
    wor = wr-bxm*bzr*(sm-ur)/(rhor*(sfr-ur)*(sfr-sm)-bxm*bxm)
    byol = byl*(rhol*(sfl-ul)*(sfl-ul)-bxm*bxm)/(rhol*(sfl-ul)*(sfl-sm)-bxm*bxm)
    byor = byr*(rhor*(sfr-ur)*(sfr-ur)-bxm*bxm)/(rhor*(sfr-ur)*(sfr-sm)-bxm*bxm)
    bzol = bzl*(rhol*(sfl-ul)*(sfl-ul)-bxm*bxm)/(rhol*(sfl-ul)*(sfl-sm)-bxm*bxm)
    bzor = bzr*(rhor*(sfr-ur)*(sfr-ur)-bxm*bxm)/(rhor*(sfr-ur)*(sfr-sm)-bxm*bxm)
    eol = ((sfl-ul)*el-ptl*ul+ptm*sm+bxm*(ul*bxl+vl*byl+wl*bzl-um*bxm-vol*byol-wol*bzol))/(sfl-sm)
    eor = ((sfr-ur)*er-ptr*ur+ptm*sm+bxm*(ur*bxr+vr*byr+wr*bzr-um*bxm-vor*byor-wor*bzor))/(sfr-sm)
    srrhoml = np.sqrt(rhoml)
    srrhomr = np.sqrt(rhomr)
    sal = sm-np.sqrt(bxm*bxm/rhoml)
    sar = sm+np.sqrt(bxm*bxm/rhomr)
    # リーマンファン内側
    vi = (srrhoml*vol+srrhomr*vor+(byor-byol)*np.sign(bxm))/(srrhoml+srrhomr)
    wi = (srrhoml*wol+srrhomr*wor+(bzor-bzol)*np.sign(bxm))/(srrhoml+srrhomr)
    byi = (srrhoml*byor+srrhomr*byol+srrhoml*srrhomr*(vor-vol)*np.sign(bxm))/(srrhoml+srrhomr)
    bzi = (srrhoml*bzor+srrhomr*bzol+srrhoml*srrhomr*(wor-wol)*np.sign(bxm))/(srrhoml+srrhomr)
    eil = eol-srrhoml*(vol*byol+wol*bzol-vi*byi-wi*bzi)*np.sign(bxm)
    eir = eor+srrhomr*(vor*byor+wor*bzor-vi*byi-wi*bzi)*np.sign(bxm)
    fxl = mhdfx(rhol, ul, vl, wl, bxm, byl, bzl, el, psim, ptl, ch)
    fxol = mhdfx(rhoml, um, vol, wol, bxm, byol, bzol, eol, psim, ptm, ch)
    fxil = mhdfx(rhoml, um, vi, wi, bxm, byi, bzi, eil, psim, ptm, ch)
    fxir = mhdfx(rhomr, um, vi, wi, bxm, byi, bzi, eir, psim, ptm, ch)
    fxor = mhdfx(rhomr, um, vor, wor, bxm, byor, bzor, eor, psim, ptm, ch)
    fxr = mhdfx(rhor, ur, vr, wr, bxm, byr, bzr, er, psim, ptr, ch)
    fx = np.zeros((XN, YN, VN))
    slindex = sfl > 0.0
    solindex = (0.0 >= sfl) & (sal > 0.0)
    silindex = (0.0 >= sal) & (sm > 0.0)
    sirindex = (0.0 >= sm) & (sar > 0.0)
    sorindex = (0.0 >= sar) & (sfr > 0.0)
    srindex = 0.0 >= sfr
    fx[slindex, :] = fxl[slindex, :]
    fx[solindex, :] = fxol[solindex, :]
    fx[silindex, :] = fxil[silindex, :]
    fx[sirindex, :] = fxir[sirindex, :]
    fx[sorindex, :] = fxor[sorindex, :]
    fx[srindex, :] = fxr[srindex, :]
    return fx
def hlldy(ql, qr, ch):
    # 密度
    rhol = ql[:, :, 0]
    rhor = qr[:, :, 0]
    # 運動量
    mxl = ql[:, :, 1]
    myl = ql[:, :, 2]
    mzl = ql[:, :, 3]
    mxr = qr[:, :, 1]
    myr = qr[:, :, 2]
    mzr = qr[:, :, 3]
    # 磁場
    bxl = ql[:, :, 4]
    byl = ql[:, :, 5]
    bzl = ql[:, :, 6]
    bxr = qr[:, :, 4]
    byr = qr[:, :, 5]
    bzr = qr[:, :, 6]
    # エネルギー
    el = ql[:, :, 7]
    er = qr[:, :, 7]
    # 磁場発散抑制のための人工ポテンシャル (Dedner et al., 2002)
    psil = ql[:, :, 8]
    psir = qr[:, :, 8]
    # 速度
    ul = mxl/rhol
    vl = myl/rhol
    wl = mzl/rhol
    ur = mxr/rhor
    vr = myr/rhor
    wr = mzr/rhor
    # (熱的) 圧力
    pl = (gam-1.0)*(el-0.5*rhol*(ul*ul+vl*vl+wl*wl)-0.5*(bxl*bxl+byl*byl+bzl*bzl))
    pr = (gam-1.0)*(er-0.5*rhor*(ur*ur+vr*vr+wr*wr)-0.5*(bxr*bxr+byr*byr+bzr*bzr))
    # 総圧力 (熱的圧力 + 磁気圧)
    ptl = pl+0.5*(bxl*bxl+byl*byl+bzl*bzl)
    ptr = pr+0.5*(bxr*bxr+byr*byr+bzr*bzr)
    # 音速
    al = np.sqrt(gam*pl/rhol)
    ar = np.sqrt(gam*pr/rhor)
    # Alfvén速度
    cal = np.sqrt((bxl*bxl+byl*byl+bzl*bzl)/rhol)
    car = np.sqrt((bxr*bxr+byr*byr+bzr*bzr)/rhor)
    cayl = np.sqrt(byl*byl/rhol)
    cayr = np.sqrt(byr*byr/rhor)
    # 速進磁気音波速度
    cfl = np.sqrt(0.5*(al*al+cal*cal+np.sqrt((al*al+cal*cal)**2-4.0*(al*cayl)**2)))
    cfr = np.sqrt(0.5*(ar*ar+car*car+np.sqrt((ar*ar+car*car)**2-4.0*(ar*cayr)**2)))
    sfl = np.minimum(vl-cfl, vr-cfr)
    sfr = np.maximum(vl+cfl, vr+cfr)
    sm = ((sfr-vr)*rhor*vr-(sfl-vl)*rhol*vl-ptr+ptl)/((sfr-vr)*rhor-(sfl-vl)*rhol)
    vm = sm
    bym = byl+0.5*(byr-byl)-0.5/ch*(psir-psil)
    psim = psil+0.5*(psir-psil)-0.5*ch*(byr-byl)
    # リーマンファン外側
    ptm = ((sfr-vr)*rhor*ptl-(sfl-vl)*rhol*ptr+rhol*rhor*(sfr-vr)*(sfl-vl)*(vr-vl))/((sfr-vr)*rhor-(sfl-vl)*rhol)
    rhoml = rhol*(sfl-vl)/(sfl-sm)
    rhomr = rhor*(sfr-vr)/(sfr-sm)
    wol = wl-bym*bzl*(sm-vl)/(rhol*(sfl-vl)*(sfl-sm)-bym*bym)
    wor = wr-bym*bzr*(sm-vr)/(rhor*(sfr-vr)*(sfr-sm)-bym*bym)
    uol = ul-bym*bxl*(sm-vl)/(rhol*(sfl-vl)*(sfl-sm)-bym*bym)
    uor = ur-bym*bxr*(sm-vr)/(rhor*(sfr-vr)*(sfr-sm)-bym*bym)
    bzol = bzl*(rhol*(sfl-vl)*(sfl-vl)-bym*bym)/(rhol*(sfl-vl)*(sfl-sm)-bym*bym)
    bzor = bzr*(rhor*(sfr-vr)*(sfr-vr)-bym*bym)/(rhor*(sfr-vr)*(sfr-sm)-bym*bym)
    bxol = bxl*(rhol*(sfl-vl)*(sfl-vl)-bym*bym)/(rhol*(sfl-vl)*(sfl-sm)-bym*bym)
    bxor = bxr*(rhor*(sfr-vr)*(sfr-vr)-bym*bym)/(rhor*(sfr-vr)*(sfr-sm)-bym*bym)
    eol = ((sfl-vl)*el-ptl*vl+ptm*sm+bym*(ul*bxl+vl*byl+wl*bzl-uol*bxol-vm*bym-wol*bzol))/(sfl-sm)
    eor = ((sfr-vr)*er-ptr*vr+ptm*sm+bym*(ur*bxr+vr*byr+wr*bzr-uor*bxor-vm*bym-wor*bzor))/(sfr-sm)
    srrhoml = np.sqrt(rhoml)
    srrhomr = np.sqrt(rhomr)
    sal = sm-np.sqrt(bym*bym/rhoml)
    sar = sm+np.sqrt(bym*bym/rhomr)
    # リーマンファン内側
    wi = (srrhoml*wol+srrhomr*wor+(bzor-bzol)*np.sign(bym))/(srrhoml+srrhomr)
    ui = (srrhoml*uol+srrhomr*uor+(bxor-bxol)*np.sign(bym))/(srrhoml+srrhomr)
    bzi = (srrhoml*bzor+srrhomr*bzol+srrhoml*srrhomr*(wor-wol)*np.sign(bym))/(srrhoml+srrhomr)
    bxi = (srrhoml*bxor+srrhomr*bxol+srrhoml*srrhomr*(uor-uol)*np.sign(bym))/(srrhoml+srrhomr)
    eil = eol-srrhoml*(uol*bxol+wol*bzol-ui*bxi-wi*bzi)*np.sign(bym)
    eir = eor+srrhomr*(uor*bxor+wor*bzor-ui*bxi-wi*bzi)*np.sign(bym)
    fyl = mhdfy(rhol, ul, vl, wl, bxl, bym, bzl, el, psim, ptl, ch)
    fyol = mhdfy(rhoml, uol, vm, wol, bxol, bym, bzol, eol, psim, ptm, ch)
    fyil = mhdfy(rhoml, ui, vm, wi, bxi, bym, bzi, eil, psim, ptm, ch)
    fyir = mhdfy(rhomr, ui, vm, wi, bxi, bym, bzi, eir, psim, ptm, ch)
    fyor = mhdfy(rhomr, uor, vm, wor, bxor, bym, bzor, eor, psim, ptm, ch)
    fyr = mhdfy(rhor, ur, vr, wr, bxr, bym, bzr, er, psim, ptr, ch)
    fy = np.zeros((XN, YN, VN))
    slindex = sfl > 0.0
    solindex = (0.0 >= sfl) & (sal > 0.0)
    silindex = (0.0 >= sal) & (sm > 0.0)
    sirindex = (0.0 >= sm) & (sar > 0.0)
    sorindex = (0.0 >= sar) & (sfr > 0.0)
    srindex = 0.0 >= sfr
    fy[slindex, :] = fyl[slindex, :]
    fy[solindex, :] = fyol[solindex, :]
    fy[silindex, :] = fyil[silindex, :]
    fy[sirindex, :] = fyir[sirindex, :]
    fy[sorindex, :] = fyor[sorindex, :]
    fy[srindex, :] = fyr[srindex, :]
    return fy
# 1次精度風上差分
def upwindx(q):
    qil = np.roll(q, 1, axis = 0)
    qi = q
    ql = qil
    qr = qi
    return ql, qr
def upwindy(q):
    qjl = np.roll(q, 1, axis = 1)
    qj = q
    ql = qjl
    qr = qj
    return ql, qr
# 2次精度MUSCL (minmod) (van Leer, 1979)
def musclx(q):
    qill = np.roll(q, 2, axis = 0)
    qil = np.roll(q, 1, axis = 0)
    qi = q
    qir = np.roll(q, -1, axis = 0)
    ql = qil+0.5*minmod(qi-qil, qil-qill)
    qr = qi-0.5*minmod(qir-qi, qi-qil)
    return ql, qr
def muscly(q):
    qjll = np.roll(q, 2, axis = 1)
    qjl = np.roll(q, 1, axis = 1)
    qj = q
    qjr = np.roll(q, -1, axis = 1)
    ql = qjl+0.5*minmod(qj-qjl, qjl-qjll)
    qr = qj-0.5*minmod(qjr-qj, qj-qjl)
    return ql, qr
# 5次精度MP5 (Suresh and Huynh, 1997)
def mp5x(q):
    qilll = np.roll(q, 3, axis = 0)
    qill = np.roll(q, 2, axis = 0)
    qil = np.roll(q, 1, axis = 0)
    qi = q
    qir = np.roll(q, -1, axis = 0)
    qirr = np.roll(q, -2, axis = 0)
    dll = qilll-2.0*qill+qil
    dl = qill-2.0*qil+qi
    d = qil-2.0*qi+qir
    dr = qi-2.0*qir+qirr
    dmml = minmod(dll, dl)
    dmm = minmod(dl, d)
    dmmr = minmod(d, dr)
    qull = qil+2.0*(qil-qill)
    qulr = qi+2.0*(qi-qir)
    #qull = qil+4.0*(qil-qill)
    #qulr = qi+4.0*(qi-qir)
    qav = 0.5*(qil+qi)
    qmd = qav-0.5*dmm
    qlcl = qil+0.5*(qil-qill)+4.0/3.0*dmml
    qlcr = qi+0.5*(qi-qir)+4.0/3.0*dmmr
    qminl = np.maximum(np.minimum(qil, qi, qmd), np.minimum(qil, qull, qlcl))
    qmaxl = np.minimum(np.maximum(qil, qi, qmd), np.maximum(qil, qull, qlcl))
    qminr = np.maximum(np.minimum(qi, qil, qmd), np.minimum(qi, qulr, qlcr))
    qmaxr = np.minimum(np.maximum(qi, qil, qmd), np.maximum(qi, qulr, qlcr))
    q5l = (2.0*qilll-13.0*qill+47.0*qil+27.0*qi-3.0*qir)/60.0
    q5r = (2.0*qirr-13.0*qir+47.0*qi+27.0*qil-3.0*qill)/60.0
    ql = median(q5l, qminl, qmaxl)
    qr = median(q5r, qminr, qmaxr)
    return ql, qr
def mp5y(q):
    qjlll = np.roll(q, 3, axis = 1)
    qjll = np.roll(q, 2, axis = 1)
    qjl = np.roll(q, 1, axis = 1)
    qj = q
    qjr = np.roll(q, -1, axis = 1)
    qjrr = np.roll(q, -2, axis = 1)
    dll = qjlll-2.0*qjll+qjl
    dl = qjll-2.0*qjl+qj
    d = qjl-2.0*qj+qjr
    dr = qj-2.0*qjr+qjrr
    dmml = minmod(dll, dl)
    dmm = minmod(dl, d)
    dmmr = minmod(d, dr)
    qull = qjl+2.0*(qjl-qjll)
    qulr = qj+2.0*(qj-qjr)
    #qull = qjl+4.0*(qjl-qjll)
    #qulr = qj+4.0*(qj-qjr)
    qav = 0.5*(qjl+qj)
    qmd = qav-0.5*dmm
    qlcl = qjl+0.5*(qjl-qjll)+4.0/3.0*dmml
    qlcr = qj+0.5*(qj-qjr)+4.0/3.0*dmmr
    qminl = np.maximum(np.minimum(qjl, qj, qmd), np.minimum(qjl, qull, qlcl))
    qmaxl = np.minimum(np.maximum(qjl, qj, qmd), np.maximum(qjl, qull, qlcl))
    qminr = np.maximum(np.minimum(qj, qjl, qmd), np.minimum(qj, qulr, qlcr))
    qmaxr = np.minimum(np.maximum(qj, qjl, qmd), np.maximum(qj, qulr, qlcr))
    q5l = (2.0*qjlll-13.0*qjll+47.0*qjl+27.0*qj-3.0*qjr)/60.0
    q5r = (2.0*qjrr-13.0*qjr+47.0*qj+27.0*qjl-3.0*qjll)/60.0
    ql = median(q5l, qminl, qmaxl)
    qr = median(q5r, qminr, qmaxr)    
    return ql, qr
# dq/dtの計算
def dqdt(q, ch):
    # x方向
    #ql, qr = upwindx(q)
    #ql, qr = musclx(q)
    ql, qr = mp5x(q)
    fx = hlldx(ql, qr, ch)
    # y方向
    #ql, qr = upwindy(q)
    #ql, qr = muscly(q)
    ql, qr = mp5y(q)
    fy = hlldy(ql, qr, ch)
    # 保存則の計算
    fxi = fx
    fyj = fy
    fxir = np.roll(fx, -1, axis = 0)
    fyjr = np.roll(fy, -1, axis = 1)
    dqdt = -(fxir-fxi)/dx-(fyjr-fyj)/dy
    return dqdt
# 1次精度Euler法
def euler(q, dt):
    ch = CFL*np.minimum(dx, dy)/dt
    q1 = q+dqdt(q, ch)*dt
    return q1
# 2次精度Runge-Kutta
def ssprk2(q, dt):
    ch = CFL*np.minimum(dx, dy)/dt
    q1 = q+dqdt(q, ch)*dt
    q2 = 0.5*q+0.5*(q1+dqdt(q1, ch)*dt)
    return q2
# 3次精度Runge-Kutta
def ssprk3(q, dt):
    ch = CFL*np.minimum(dx, dy)/dt
    q1 = q+dqdt(q, ch)*dt
    q2 = 0.75*q+0.25*(q1+dqdt(q1, ch)*dt)
    q3 = 1.0/3.0*q+2.0/3.0*(q2+dqdt(q2, ch)*dt)
    return q3

def main():
    # rho: 密度
    # p:   圧力
    # u:   x方向速度
    # v:   y方向速度
    # w:   z方向速度
    # mx:  x方向運動量
    # my:  y方向運動量
    # mz:  z方向運動量
    # bx:  x方向磁場
    # by:  y方向磁場
    # bz:  z方向磁場
    # e:   エネルギー
    # psi: 磁場発散抑制のための人工ポテンシャル (Dedner et al., 2002)
    # q: 保存変数ベクトル
    # -- 0: rho
    # -- 1: mx
    # -- 2: my
    # -- 3: mz
    # -- 4: bx
    # -- 5: by
    # -- 6: bz
    # -- 7: e
    # -- 8: psi
    x = np.arange(0.0, XL, dx)
    y = np.arange(0.0, YL, dy)
    # 初期値
    rho = np.ones((XN, YN))*gam**2
    p = np.ones((XN, YN))*gam
    u = np.tile(-np.sin(y), (XN, 1))
    v = np.tile(np.sin(x), (YN, 1)).T
    w = np.zeros((XN, YN))
    bx = np.tile(-np.sin(y), (XN, 1))
    by = np.tile(np.sin(2.0*x), (YN, 1)).T
    bz = np.zeros((XN, YN))
    e = p/(gam-1.0)+0.5*rho*(u*u+v*v+w*w)+0.5*(bx*bx+by*by+bz*bz)
    psi = np.zeros((XN, YN))
    q = np.zeros((XN, YN, VN)); q_n = np.zeros((XN, YN, VN))
    q[:, :, 0] = rho
    q[:, :, 1] = rho*u
    q[:, :, 2] = rho*v
    q[:, :, 3] = rho*w
    q[:, :, 4] = bx
    q[:, :, 5] = by
    q[:, :, 6] = bz
    q[:, :, 7] = e
    q[:, :, 8] = psi
    # 計算開始
    n = 0
    t = 0.0
    step = 0
    start = time.time()
    while (t <= TL):
        # 密度
        rho = q[:, :, 0]
        # 運動量
        mx = q[:, :, 1]
        my = q[:, :, 2]
        mz = q[:, :, 3]
        # 磁場
        bx = q[:, :, 4]
        by = q[:, :, 5]
        bz = q[:, :, 6]
        # エネルギー
        e = q[:, :, 7]
        # 磁場発散抑制のための人工ポテンシャル (Dedner et al., 2002)
        psi = q[:, :, 8]
        # 速度
        u = mx/rho
        v = my/rho
        w = mz/rho
        # (熱的) 圧力
        p = (gam-1.0)*(e-0.5*rho*(u*u+v*v+w*w)-0.5*(bx*bx+by*by+bz*bz))
        # 音速
        a = np.sqrt(gam*p/rho)
        # Alfvén速度
        ca = np.sqrt((bx*bx+by*by+bz*bz)/rho)
        cax = np.sqrt(bx*bx/rho)
        cay = np.sqrt(by*by/rho)
        # 速進磁気音波速度
        cfx = np.sqrt(0.5*(a*a+ca*ca+np.sqrt((a*a+ca*ca)**2-4.0*(a*cax)**2)))
        cfy = np.sqrt(0.5*(a*a+ca*ca+np.sqrt((a*a+ca*ca)**2-4.0*(a*cay)**2)))
        # CFL数をもとにdtを設定
        dt = CFL*np.minimum(dx/np.max(np.abs(u)+cfx), dy/np.max(np.abs(v)+cfy))
        # 磁場発散抑制のためのパラメータ (Dedner et al., 2002)
        ch = CFL*np.minimum(dx, dy)/dt
        cd = np.exp(-dt*ch/cr)
        # 時間発展
        #q_n = euler(q, dt)
        #q_n = ssprk2(q, dt)
        q_n = ssprk3(q, dt)
        q_n[:, :, 8] = cd*q_n[:, :, 8]
        q = q_n
        # ファイル出力
        if (np.floor(t*PN/TL) != np.floor((t-dt)*PN/TL)) :
            print("n: {0:3d}, t: {1:.2f}".format(n, t))
            #varname = ["r", "p", "u", "v", "w", "bx", "by", "bz", "ps"]
            #vardata = [rho, p, u, v, w, bx, by, bz, psi]
            #for i in range(VN):
            #    np.savetxt("../data/"+varname[i]+"_{0:0>3}.csv".format(n), vardata[i].T, delimiter = ",")
            n += 1
        t += dt
        step += 1
    elapsed_time = time.time()-start
    print("elapsed_time: {0} s".format(elapsed_time))
    # 計算終了
    # 計算失敗の検知
    if (not np.isfinite(t)):
        print("Computaion failed: step = {0:3d}".format(step))
    # y = πでの値
    print("   x    rho      p      u      v      w     bx     by     bz    psi")
    for i in range(0, XN, 5):
        j = YN//2
        print("{0:4.2f}  {1:5.2f}  {2:5.2f}  {3:5.2f}  {4:5.2f}  {5:5.2f}  {6:5.2f}  {7:5.2f}  {8:5.2f}  {9:5.2f}"
              .format(dx*i, rho[i, j], p[i, j], u[i, j], v[i, j], w[i, j], bx[i, j], by[i, j], bz[i, j], psi[i, j]))
    # 結果をプロット
    fig = plt.figure()
    plt.axes().set_aspect("equal")
    plt.title("p (t = {0:.2f})".format(t))
    plt.xlabel("X")
    plt.ylabel("Y")
    im = plt.pcolor(x, y, p.T, cmap = cm.jet, shading = "auto")
    im.set_clim(0.0, 6.5)
    fig.colorbar(im)
    plt.show()

if __name__ == "__main__":
    main()

参考文献

  • Orszag, S., & Tang, C. (1979). Small-scale structure of two-dimensional magnetohydrodynamic turbulence. Journal of Fluid Mechanics, 90(1), 129-143. doi:10.1017/S002211207900210X
  • Takahiro Miyoshi, Kanya Kusano, A multi-state HLL approximate Riemann solver for ideal magnetohydrodynamics, Journal of Computational Physics, Volume 208, Issue 1, 2005, Pages 315-344, ISSN 0021-9991, https://doi.org/10.1016/j.jcp.2005.02.017.
  • A. Dedner, F. Kemm, D. Kröner, C.-D. Munz, T. Schnitzer, M. Wesenberg, Hyperbolic Divergence Cleaning for the MHD Equations,Journal of Computational Physics, Volume 175, Issue 2, 2002, Pages 645-673, ISSN 0021-9991, https://doi.org/10.1006/jcph.2001.6961.
  • Bram van Leer, Towards the ultimate conservative difference scheme. V. A second-order sequel to Godunov's method, Journal of Computational Physics, Volume 32, Issue 1, 1979, Pages 101-136, ISSN 0021-9991, https://doi.org/10.1016/0021-9991(79)90145-1.
  • Roe, P L, Characteristic-Based Schemes for the Euler Equations, Annual Review of Fluid Mechanics, Volume 18, 337-365, 1986, https://doi.org/10.1146/annurev.fl.18.010186.002005
  • Bram Van Leer, Towards the ultimate conservative difference scheme III. Upstream-centered finite-difference schemes for ideal compressible flow, Journal of Computational Physics, Volume 23, Issue 3, 1977, Pages 263-275, ISSN 0021-9991, https://doi.org/10.1016/0021-9991(77)90094-8.
  • A. Suresh, H.T. Huynh, Accurate Monotonicity-Preserving Schemes with Runge–Kutta Time Stepping, Journal of Computational Physics, Volume 136, Issue 1, 1997, Pages 83-99, ISSN 0021-9991, https://doi.org/10.1006/jcph.1997.5745.
  • Chi-Wang Shu, Stanley Osher, Efficient implementation of essentially non-oscillatory shock-capturing schemes, Journal of Computational Physics, Volume 77, Issue 2, 1988, Pages 439-471, ISSN 0021-9991, https://doi.org/10.1016/0021-9991(88)90177-5.

また、今回の記事執筆に際し、以下のページを全般に渡って参考にしています。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【mac】PythonでRealSense D415のサンプルを動かす

RealSense D415とは

Intel社の提供するRealsenseは奥行き知覚機能を有するカメラデバイス.公式ではmacのサポートをしていないが,librealsenseという非公式のライブラリで動かすことができる.

環境

  • macBookPro(13-inch, 2017, Twoo Thunderbolt 3ports)
  • macOS Catalina 10.15.7
  • zsh
  • Homebrewインストール済
  • PyenvでPythonの環境構築済

環境構築手順

必要なライブラリをインストール

ターミナルを開きホームディレクトリで以下のコマンドを一行ずつ実行する

ターミナル
$ brew install python
$ brew install libusb pkg-config
$ brew install homebrew/core/glfw3
$ brew install cmake
$ brew install boost
$ brew install flann
$ brew install vtk5 –with-qt
$ git clone https://github.com/PointCloudLibrary/pcl
$ cd pcl
$ mkdir build && cd build
$ cmake ..
$ make
$ sudo make install

librealsenseのソースコードをインストール&build

cdコマンドでホームディレクトリに戻り,librealsenseの本体をインストールする
3行目の使用しているPythonの絶対パスの部分はwhich pythonで出力されたパスに置き換える

ターミナル
$ git clone https://github.com/IntelRealSense/librealsense.git
$ cd librealsense
$ mkdir build && cd build
$ cmake .. -DBUILD_EXAMPLES=true -DBUILD_WITH_OPENMP=false -DHWM_OVER_XU=false -DBUILD_PYTHON_BINDINGS=true -DPYTHON_EXECUTABLE:FILEPATH=使用しているPyhtonの絶対パス -G "Unix Makefiles"
$ make -j2
$ sudo make install

私の環境ではPythonのバージョンが3.8系では動かなかったため事前に3.6系へバージョンを変えておく(今回はpython3.6.7を使う)
/libresenseディレクトリに戻り

ターミナル
$ pyenv install 3.6.7
$ pyenv local 3.6.7

pyenv versionsコマンドで/libresenseディレクトリ以下がpython3.6.7になっていることを確認する
また,このままだと動かないと思うので以下の手順でsoファイルがおいてある場所(以下A)を確認する

  1. VSCode等でlibrealsense/build/CMakeCache.txtを開く
  2. command + Fで検索窓を開きPYTHON_INSTALL_DIR:PATHを検索する
  3. PYTHON_INSTALL_DIR:PATH:PATH=xxxとかいてある.xxxがsoファイルの置いてあるディレクトリ(A)

シンボリックを置く場所(以下B)を確認する

ターミナル
$ python3
>>>import site; site.getsitepackages()

その後確認した場所(B)へ移動し,シンボリックを作成する

ターミナル
$ cd Bへの絶対パス
$ ln -s 確認したsoファイルのパス(A)/pyrealsense2.cpython-36m-darwin.so pyrealsense2.so
$ ln -s 確認したsoファイルのパス(A)/pybackend2.cpython-36m-darwin.so pybackend2.so

pyrealsense2のインポートが通るか確認

ターミナル
$ python
>>> import pyrealsense2
>>>

このようにimportが通ればOK

サンプルを動かす

macとRealsenseをUSB接続し,サンプルファイルが置いてあるディレクトリに移動し実行する

ターミナル
$ cd ~/librealsense/wrappers/python/examples
$ python 動かしたいファイル名

参考
(参考Qiita1)[https://qiita.com/mizumasa/items/c6ca2c73fa86c9a5a39c]

(参考Qiita2)[https://qiita.com/comachi/items/14b358568365bc5ee474]

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

RhinocerosでモデリングしたネットワークをPythonで読み込む②

前回は、建築・都市におけるネットワーク分析の一例として歩行空間ネットワークを取り上げ、地図情報をもとにRhinocerosで実際に歩行空間ネットワークを描きました。
→RhinocerosでモデリングしたネットワークをPythonで読み込む①

本記事では、描いたネットワークを分析ができる形にすることが目的です。

目標

  1. Rhinocerosでネットワークを描く
  2. Rhinocerosで描いたネットワークをPythonで読み込めるように書き出す
  3. 書き出したファイルをPythonで読み込んで隣接配列を作る

本記事では第2項に取り組みます。

隣接配列

ネットワークを表すデータ構造には隣接行列、距離行列、隣接配列などがあります。本記事では、ネットワークを隣接配列として表現する方法を紹介します。

その前に、それぞれのデータ構造について簡単な説明を以下に述べます。

・隣接行列
順序付けられたノードの隣接関係を0/1(隣接していない/隣接している)で表した正方行列を、隣接行列と言います。エッジの重みを考えず、ノードの隣接関係だけに着目したものを、厳密にはグラフと呼びます。

例えば、

を隣接行列で表すと、

\begin{pmatrix}
0&1&0&1\\
1&0&1&0\\
0&1&0&1\\
1&0&1&0
\end{pmatrix}

になります。

隣接行列を使えば、ノードからノードへの経路数を求めるなどが代数的にできます。上のグラフはエッジに向きがない(無向グラフ)ので隣接行列は対称ですが、対称性を崩して一方通行のエッジを表現することも可能です(有向グラフ)。

・距離行列
隣接行列の各成分は、ノード間の隣接関係を表していました。距離行列は、その要素にノード間を結ぶエッジの重みが与えられたものです。先ほどのグラフのエッジに重みを与えたネットワーク

の距離行列は、

\begin{pmatrix}
0&1.5&0&8\\
1.5&0&5&0\\
0&5&0&2.3\\
8&0&2.3&0
\end{pmatrix}

になります。隣接していないノード間の要素に、便宜的に0を置いていますが、エッジの重みが0であることを意味しないことに注意してください。

・隣接配列
先のふたつのデータ構造は、どちらも行列を用いたものです。ここで、建築や都市の中にあるネットワークを考えてみると、あるノードと隣接するノードは高々4つか5つくらいであることが想像できます。そのとき、隣接行列、距離行列は0がたくさん並んだものになって、とても無駄が多いものになります。そこで、隣接しているノード間の関係だけを記憶しておこうというのが隣接配列です。

先ほどのネットワークの隣接配列を実際に作ってみると、次のようになります。

\begin{align}
&1: &[2, 4]\\
&2: &[1, 3]\\
&3: &[2, 4]\\
&4: &[1, 3]
\end{align}

コロンの左に注目するノード、右に隣接するノードの配列を記しています。同時に、エッジの重みも同様にして記憶しておきましょう。これを仮に距離配列と呼びます。

\begin{align}
&1: &[1.5, 8]\\
&2: &[1.5, 5]\\
&3: &[5, 2.3]\\
&4: &[8, 2.3]
\end{align}

隣接配列と距離配列の順序が対応していることに注意してください。この対応によって、「隣接配列からノード3とノード2が隣接していることがわかる。そのエッジの重みは、ノード2がノード3の隣接配列の1番目にあるから、ノード3の距離配列の1番目を見て5だとわかる」というような検索が可能になります。

隣接配列を使えば、ネットワークが大きなものになっても要領よく記憶しておくことができます。やりたい分析や解析によっては隣接行列や距離行列が必要になることもありますが、当面は隣接配列で十分だと思われます。

モデリングしたネットワークを隣接配列にして書き出す

それではいよいよ、Rhinocerosでモデリングしたネットワークを隣接配列にしていきます。
環境はWindows10、Rhinoceros 5です。

方法

以下のコードでは、ネットワークがすべて直線分でモデリングされていることを要求します。

まず、Rhinocerosのコマンドに、

EditPythonScript

と入力してPythonエディタを起動し、以下のコードを張り付けて、走らせます。check pointと書かれた部分については、後に説明を加えます。

rhino_network_write.py
# coding utf-8

import rhinoscriptsyntax as rs

import math
import random
import sys
import os

""" check point1 """
edges = rs.GetObjects("select objects")

adjacentList = {}
attributeList = {}
""" check point2 """
coordinations = []
coordinations_rounded = []  # for checking if a point is searched already or not

"""
example
adjacentList =  {0: [1, 2, 3], 1: [0, 2],...}
attributeList = {0: [2.0, 5.0, 8.6], 1: [2.0, 4.7], ...}
coordinations = [[4.5, 8.2, 0.0], [5.2, 3.6, 0.0], ...]
"""

cnt = 0   # point id
for i in range(len(edges)):
    edge = edges[i]
    try:
        end_points = rs.CurvePoints(edge)
    except:
        rs.ObjectColor(edge, (255, 0, 0))
        print("Error", edge)
        break

    l = rs.CurveLength(edge)
    """ check point3 """
    kind = rs.ObjectLayer(edge)

    for p in end_points:
        """ check point4 """
        p_rounded = [round(p[0], 2), round(p[1], 2), round(p[2], 2)] # round to the second decimal place
        if p_rounded in coordinations_rounded: # this point is already searched
            pass
        else: # search a new point 
            coordinations.append(p)
            coordinations_rounded.append(p_rounded)
            adjacentList[cnt] = []
            attributeList[cnt] = []
            cnt += 1

    # two end points are connected
    point_ids = [coordinations_rounded.index([round(p[0], 2), round(p[1], 2), round(p[2], 2)]) for p in end_points]

    adjacentList[point_ids[0]].append(point_ids[1])
    adjacentList[point_ids[1]].append(point_ids[0])

    attributeList[point_ids[0]].append((l, kind)) # you can add another info in attributeList
    attributeList[point_ids[1]].append((l, kind))

for k in adjacentList.keys():
    rs.AddTextDot(k, coordinations[k])

# set your path to the project folder
f = open("C:\\Users\\Owner\\Desktop\\adjacentList.txt", "w")
for k, v in adjacentList.items():
    print(k, v)
    f.write(str(k)+"\n")
    for nei in v:
        f.write(str(nei)+",")
    f.write("\n")
f.close()

f = open("C:\\Users\\Owner\\Desktop\\attributeList.txt", "w")
for k, v in attributeList.items():
    print(k, v)
    f.write(str(k)+"\n")
    for ats in v:   #ats = (length, layer_name)
        for at in ats:
            f.write(str(at)+"\n")
    f.write("_\n")
f.close()

f = open("C:\\Users\\Owner\\Desktop\\coordinations.txt", "w")
for p in coordinations:
    for u in p:
        f.write(str(u)+"\n")
f.close()

コードが走り終わると、adjacentList.txt、attributeList.txt、coordinations.txtが得られます。

コードの注記

・check point1
コードを走らせると、まず、オブジェクトを選択するよう求められますが、それがこの箇所になります。選択するオブジェクトは、モデリングしたネットワークを構成する直線分すべてです。

・check point2
分析結果の可視化など、追々ノードの座標が必要になるので、このコードの中で記憶しておきます。

・check point3
このコードでは、エッジの重みとしてエッジの長さを想定しています。ただ、分析の目的によっては長さだけでなく、性質(平坦な道なのか階段なのか、一般道なのか高速道なのか)も紐づけておきたくなることがあります。それらの性質ごとにレイヤー分けしてモデリングしたならば、この行でそのレイヤー名をエッジに紐づけて記憶しておくことができます。レイヤー名は半角英数字としてください。

・check point4
ネットワークをモデリングする際、スナップを効かせて線分の端点は一致させておくことが可能で、たいていの場合、座標が等しいかどうかでノードの同一性を確かめることが可能です。ただ、時折スナップを効かせられていないことに気づかなかった場合、同じノードが違うノードとしてデータ化されてしまうことがあります。それを防ぐために、あらかじめ許容誤差を設定しておいて、座標の差異がその誤差の範囲内なら同一ノードとみなすこととしています。ここでは、その誤差を小数第2位としていますが、ネットワークの広さやモデリングの際の単位に気を付けて設定してください。

デモンストレーション

RhinocerosでPythonを動かすのが初めての人用に、画面遷移を示したいと思います。

まず、Rhinocerosを起動し、モデリングしたネットワークのファイルを開きます。次に、コマンドに

EditPythonScript

と打ち込み、エディタを起動します。

上図では、歩道(network\street)と横断歩道(network\crosswalk)をレイヤーで分けてモデリングしています。

エディタ上部の緑の▶をクリックしてコードを走らせます。すると、Rhinocerosのコマンドに

select objects:

と出てくるので、ネットワーク全体を選択してEnterを押します。

すると、adjacentList.txt、attributeList.txt、coordinations.txtが得られるとともに、ノードの位置にテキストドットで番号が振られます。出来上がったデータが正しいかどうか確かめる手段の一つとしてお使いください。

【次回】書き出したネットワークデータをPythonで読み込む

お気づきの方も多いかもしれませんが、実は、本記事で紹介したコードの中で、すでに、隣接配列そのものを一度作っており、それをまた.txtにして書き出すという回りくどいことをしました。Pythonにはネットワーク分析関連のモジュールが豊富に用意されているのですが、Rhinoceros上のPythonでは使えないことが多いためこのようなことをしています。

次回は、得られた.txtを隣接配列に再翻訳することで、Python上での分析を可能にします。

一連の記事に関連するコードは、以下にあります。
→関連するコード

ホームページのご案内

建築・都市計画数理に関する私の研究の内容や、研究にまつわるtips等は、個人ホームページにて公開しています。ゆくゆくは、気軽に使える分析ツールも作っていきたいと思っているので、ぜひ覗きに来てください。
→田端祥太の個人ホームページ

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

pythonでつくる数字判別アプリで6桁の数字を判別させてみた

ff.PNG

はじめまして。
devと申します。

Qiitaへの投稿は初めてです。
この投稿がどなたかの役に立てばと思い、記事を作成しています。

今回はOCRを利用した数字判別アプリについてご紹介します。

アプリについて

ごく単純ですが、入力された画像をOCR(Google cloud vision api)で判別し、答えを返すWebアプリです。

作成理由

車のスピードメーターみたいなアナログなメーターを読み取れたら便利ですよね。
サービスとしては既に提供されているようだけど、「自分も少し近いことしてみたいなぁ」と思ったのが発端。

でもメーターの読み取りって難易度高そう。初めてのアプリなので、「まずはアナログな走行距離数字を読み取ってみよう!」という訳で数字の識別にチャレンジです。

基本機能

6桁数字の画像を選んでポチれば、読み取った数字をWeb画面に返します。

例えばこんな画像を読み取れます。
sample1.png
sample2.png

作って感じたのは「Google cloud vision api」を使えばこんな簡単に高精度のアプリができるんだ!ってことですね。

簡単ながら、精度もGOOD(le)!

しかも、数字だけでなく文字も判定できちゃいます。

だからこんなのもOK

図1.png
あ1.png

でも何に使えるの?

お遊びアプリなので、このままでは「何にも使えません」 笑

発展形としては「書類NOの通し番号読み取り」や「伝票読み取り」にも使えるかと思います。

別に「Google cloud vision api」を使わずとも「Tesseract」や、その他フリーソフトでもOCR機能は実現できますね。

実装環境

html
css
Flask

選択肢

数字を読み取るために、以下の2つを検討しました。
・mnistの学習データ
・OCR

mnistデータセットを使って学習させる

mnistの場合、学習させるのは比較的容易です。
ただし桁数を表現するために、1桁目、2桁目といった物体検出が必要。

学習データは以下のような方法で保存できます。
■[参考]

mnist学習のサンプルコード
from keras.datasets import mnist
from keras.models import Sequential, load_model
from keras.layers.core import Dense, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, Dropout, Reshape
from keras.utils import np_utils
import numpy as np
(X_train, y_train),(X_test, y_test) = mnist.load_data()
X_train = np.array(X_train)/255
X_test = np.array(X_test)/255
y_train = np_utils.to_categorical(y_train)
y_test = np_utils.to_categorical(y_test)
model = Sequential()
model.add(Reshape((28,28,1),input_shape=(28,28)))
model.add(Conv2D(32,(3,3)))
model.add(Activation("relu"))
model.add(Conv2D(32,(3,3)))
model.add(Activation("relu"))
model.add(MaxPooling2D((2,2)))
model.add(Dropout(0.5))
model.add(Conv2D(16,(3,3)))
model.add(Activation("relu"))
model.add(MaxPooling2D((2,2)))
model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(784))
model.add(Activation("relu"))
model.add(Dropout(0.5))
model.add(Dense(10))
model.add(Activation("softmax"))
model.compile(loss="categorical_crossentropy", optimizer="sgd", metrics=["accuracy"])
hist = model.fit(X_train, y_train, batch_size=200,
                 verbose=1, epochs=1, validation_split=0.1)
score = model.evaluate(X_test, y_test, verbose=1)
print('Test loss:', score[0])
print("test accuracy:", score[1])
model.save("C:/test/mnist_main.h5")

OCRを使って判別させる

何よりも手っ取り早いのがOCR。
GoogleのAPIを使うので精度も高く、作りこむ必要がありません。
桁数も気にせず読み取ってくれます。

用途次第では十分利用できますね!
※「Google cloud vision api」実装はこちらを参考にさせていただきました。

今後の発展

業務に合わせて、書類の通しNoを読み取る機能が実装できると役に立ちそうだなと思ってます。
空き時間をみて学習させてみるつもりです。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonで学ぶ簿記 ~簿記一巡の流れ編~

はじめに

Python(主にPandas)で簿記一巡の流れをコーディングしながら理解することが、この記事の目的です。

おおまかな流れの理解を目的としているため、簿記の教科書に出てくる項目を一部省略して説明しています。
例えば、

  • 商品在庫、仕入金額を元にした売上原価の算定
  • 精算表の作成
  • 翌期への繰越

といった事項は割愛しています。

0.簿記とは

企業で行われた取引を仕訳として記録し、貸借対照表(B/S)損益計算書(P/L)を作成する手続きです。

仕訳のイメージは以下のとおりです。
image.png

仕訳の左側を借方右側を貸方と呼びます。
残念ながら、これは覚えるしかない約束事です。

貸借対照表(B/S)は企業の財政状態、平たく言うと
「企業が今どれだけの財産を持っているか」
をあらわす表です。

image.png

損益計算書(P/L)は企業の経営成績、平たく言うと
「企業が一定の期間にどれだけ儲かったか(損したか)」
をあらわす表です。

image.png

簿記の手続きは、ごく大まかに言うと以下1~3の流れで行います。

1. 仕訳の作成

  取引を仕訳として仕訳帳に記録する。

2. 残高試算表(T/B)の作成

  仕訳帳から勘定科目ごとの借方金額、貸方金額を集計し、残高試算表を作成する。

3. 貸借対照表(B/S)、損益計算書(P/L)の作成

  残高試算表を貸借対照表(B/S)損益計算書(P/L)に分割する。(注1)

なお1.仕訳の作成にあたっては以下4つのルールを守る必要があります。

  • 仕訳ルール①:借方(左側)貸方(右側)勘定科目金額を記載する
  • 仕訳ルール②:借方、貸方それぞれの合計金額は必ず一致する
  • 仕訳ルール③:資産の増加は借方、負債純資産の増加は貸方に記載する
  • 仕訳ルール④:費用は借方、収益は貸方に記載する

・・・ここまで、簿記特有の用語が次から次へと現れました。
特に簿記の初学者の方は、用語の多さに戸惑うことと思います。

まずは、簿記の世界では

  • 企業の取引を仕訳という形式であらわす
  • 仕訳の左側を借方右側を貸方と呼ぶ

という点だけ覚えてください。

その他用語については、仕訳を作成する中で①~④のルールを都度引用しながら説明していきます。

(注1)

簿記の教科書では、1→2の間に「決算整理仕訳」を作成し、2→3の間に「精算表」を作成する、とあります。

 残高試算表 → 決算整理仕訳 → 貸借対照表、損益計算書

の数字を記録するのが精算表です。

しかし経理業務の現場では、決算整理仕訳は通常の仕訳と区別せず行うケースが多いものと思われます。

また2→3はシステム上自動で生成され、わざわざ精算表を作成しないケースが多いと思われるため、本稿では割愛します。
(簿記の試験のように手書きで財務諸表を作成する際には必要な表ですが。。)

1.仕訳の作成

【やりたいこと】
取引を仕訳として仕訳帳に記録します。

【コーディングの方針】
仕訳をPandasのDataFrame形式で、仕訳帳 df_siwake に入力します。
Siwakeクラスの関数 entry を呼び出すことで、df_siwakeに仕訳データの行を追加し、更新します。

仕訳ルール①:借方(左側)、貸方(右側)に勘定科目と金額を記載する

に従い、仕訳を入力するための空のDataFrameとして仕訳帳 df_siwake を作成します。

import pandas as pd
df_siwake = pd.DataFrame(index=[],
                         columns=['仕訳番号', '日付', '借方科目', '借方金額', '貸方科目', '貸方金額'])

例えば
(借方)現金 /(貸方)資本金
という仕訳を入力する場合、DataFrameの型に従って毎回

df_siwake.append(pd.Series([1, 20200401, '現金', 1000, '資本金', 1000], index=df_kamoku.columns), ignore_index=True)

というコードを書き、DataFrameの行を追加しても良いのですが、

  • 複合仕訳(借方と貸方が1対1対応していない仕訳)を入力できない ・・・①
  • 入力時に、仕訳のデータ型が正しいか(借方金額に文字列を入力するなど)確認できない ・・・②
  • 入力時に、仕訳の貸借が一致しているか確認できない ・・・③

との問題があります。

※簿記の世界では③が守られていることが非常に重要です。
仕訳入力時に必ず貸借が一致していることにより、後ほど残高試算表を作るときにも、
貸借金額が一致することが保証されます。

そこで仕訳入力用のクラス Siwake を定義し、まずは①複合仕訳の入力に対応する形で
関数 entry を定義します。

※最終的には Siwake 内に②データ型チェック、③貸借一致チェックの関数を実装しますが、
コードが長くなるため、末尾に補足として追記します。

class Siwake:
    def __init__(self):
        self.siwake_no = 0

    def entry(self, df, date, kari, kashi): 
        self.siwake_no += 1 # ...仕訳番号を更新

        for i in range(len(kari)): # ...複合仕訳に対応するため[借方科目、借方金額]の数だけループを回す                    
            kari_entry = pd.Series([self.siwake_no] + [date] + kari[i] + ["", 0], index=df.columns)          
            df = df.append(kari_entry, ignore_index=True)

        for i in range(len(kashi)): # ...複合仕訳に対応するため[貸方科目、貸方金額]の数だけループを回す                    
            kashi_entry = pd.Series([self.siwake_no] + [date] + ["", 0] + kashi[i], index=df.columns)
            df = df.append(kashi_entry, ignore_index=True)

        return df

クラス Siwake 内のコメントについて補足します。

仕訳帳には仕訳を日付順に記録していくため、クラス変数として仕訳番号siwake_noを定義しています。
仕訳が作成される(関数entryが呼び出される)ごとに

self.siwake_no += 1 # ...仕訳番号を更新

で仕訳番号を連番として更新しています。

また本稿では借方科目、貸方科目とも1行の仕訳のみを扱っていますが、実務上は複数行の仕訳を作成することが多いです。
複合仕訳に対応するため以下のようにループを回し、借方・貸方それぞれのpd.Seriesを生成した上で、
仕訳日記帳 df_siwake に行を追加しています。

for i in range(len(kari)): # ...複合仕訳に対応するため[借方科目、借方金額]の数だけループを回す
    kari_entry = pd.Series([self.siwake_no] + [date] + kari[i] + ["", 0], index=df.columns)          
    df = df.append(kari_entry, ignore_index=True)

for i in range(len(kashi)): # ...複合仕訳に対応するため[貸方科目、貸方金額]の数だけループを回す
    kashi_entry = pd.Series([self.siwake_no] + [date] + ["", 0] + kashi[i], index=df.columns)
    df = df.append(kashi_entry, ignore_index=True)

クラス Siwake の補足説明は以上です。

関数entryには以下の形式で入力します。

siwake.entry(df_siwake:仕訳帳, date:日付,
             kari :[[借方科目1, 借方金額1], [借方科目2, 借方金額2], ... ],
             kashi:[[貸方科目1, 貸方金額1], [貸方科目2, 貸方金額2], ... ])

すると仕訳日記帳 df_siwake に以下の行が追加されます。

index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
0 siwake_no
(数値)
date
(数値)
kari[0][0]
(文字列)
kari[0][1]
(数値)
""
(文字列)
0
(数値)
1 siwake_no
(数値)
date
(数値)
""
(文字列)
0
(数値)
kashi[0][0]
(文字列)
kashi[0][1]
(数値)

それではクラスSiwakeのインスタンスを生成し、

siwake = Siwake()

会社運営の最も基本的なサイクルである

  1. 会社を設立
  2. 商品の仕入
  3. 商品の販売
  4. 仕入代金の支払
  5. 販売代金の回収

の各取引について仕訳を作成していきます。

仕訳①:会社を設立

現金1000を元手に会社を設立しました。

この元手のことを資本金と呼び、仕訳は以下の通りです。

df_siwake = siwake.entry(df_siwake, 20200401,
                         [['現金', 1000]],
                         [['資本金', 1000]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
0 1 20200401 現金 1000 0
1 1 20200401 0 資本金 1000

会社に払い込まれた現金は会社のもの、つまり資産ですから借方に記載します。

一方、資本金は現金1000を払込した(株式会社であれば)株主のものです。純資産といい、貸方に記載します。

借方金額1000と貸方金額1000が必ず一致していることが大事です。

少し話は先に進みますが、貸方の純資産はおおまかに言うと

会社設立時に払い込まれた資本金 + 会社設立後に稼いだ利益

からなります。
事業を行うことで利益を蓄積し、(株式会社であれば)株主に配当として還元する、
というのが会社の基本サイクルです。

会社設立後に稼いだ利益が、貸方の純資産に来ることから、

  • 収益の発生 → 貸方の利益を増やす → 貸方に記載
  • 費用の発生 → 貸方の利益を減らす → 借方に記載

すなわち、
仕訳ルール④:費用は借方、収益は貸方に記載する
が導かれます。

仕訳②:商品の仕入れ

商品500を外部業者より仕入れました。
仕訳は以下の通りです。

df_siwake = siwake.entry(df_siwake, 20200402,
                         [['商品', 500]],
                         [['買掛金', 500]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
2 2 20200402 商品 500 0
3 2 20200402 0 買掛金 500

仕入れた商品は会社のもの、つまり資産ですから借方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する

会社間の取引において、仕入代金は仕入先と〇〇日後支払、などの条件を取り決めて
後払いすることがほとんどです。
(もちろんその場で支払する場合もあります)

この支払うまでの債務(代金を支払う義務)を買掛金といいます。

買掛金は将来、外部業者の現金となる、つまり負債ですから貸方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する

仕訳③:商品を売上

仕入れた商品のうち200について、価格300で販売しました。
売上についての仕訳は以下の通りです。

df_siwake = siwake.entry(df_siwake, 20200403,
                         [['売掛金', 300]],
                         [['売上', 300]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
4 3 20200403 売掛金 300 0
5 3 20200403 0 売上 300

会社間の取引において 、販売代金は販売先と〇〇日後回収、などの条件を取り決めて
後日回収することがほとんどです。
(もちらん、その場で回収する場合もあります)

この回収するまでの債権(代金をもらう権利)を売掛金といいます。

売掛金は将来、会社の現金となる、つまり資産ですから借方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する

売上は収益であり、会社の純資産(貸方)を増やす効果がありますから、貸方に記載します。
ルール④:費用は借方、収益は貸方に記載する

売上に対応する原価についての仕訳は以下の通りです。

df_siwake = siwake.entry(df_siwake, 20200403,
                         [['売上原価', 200]],
                         [['商品', 200]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
6 4 20200403 売上原価 200 0
7 4 20200403 0 商品 200

商品を販売することで、会社のものである商品(借方)が減るので、その反対である貸方に記載します
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する
ルール'③:資産の減少は貸方、負債と純資産の減少は借方に記載する

商品の減少と共に、商品の売上に対応する費用売上原価として計上します。(注2)

費用は会社の純資産(貸方)を減らす効果がありますから、その反対である借方に記載します。
ルール④:費用は借方、収益は貸方に記載する

(注2)
本稿では簡略化のため、商品を売り上げたと同時に売上原価を計上しています。

実際には売上原価は、冒頭に少しだけ触れた「決算整理仕訳」により

売上原価 = 仕入金額の総額 + 期首の商品在庫金額 - 期末の商品在庫金額

という式で算定します。

なお原価200の商品を、販売価額300で売り上げたため、その取引の利益は100になります。

しかし簿記の手続きでは、商品を販売するたびに利益を認識することはありません。
簿記の目的は一定の期間(会計期間)における利益を算定することであるためです。

利益については、期末に当期分の収益および費用の合計金額を集計し、
その差額である利益について損益振替という仕訳を作成することで、
会計期間末に一度に認識します(後述)。

仕訳④:仕入代金を支払

仕入代金500のうち300について、外部業者に支払いました。
(残りの200は翌月支払の契約と仮定します)
仕訳は以下の通りです。

df_siwake = siwake.entry(df_siwake, 20200420,
                         [['買掛金', 300]],
                         [['現金', 300]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
8 5 20200420 買掛金 300 0
9 5 20200420 0 現金 300

仕入代金を支払うことで、会社の資産である現金借方)が減りますから、
その反対である貸方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する
ルール'③:資産の減少は貸方、負債と純資産の減少は借方に記載する

一方、仕入先に代金を支払う債務である買掛金貸方)も、支払完了と共になくなるため、
その反対である借方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する
ルール'③:資産の減少は貸方、負債と純資産の減少は借方に記載する

仕訳⑤:販売代金を回収

販売代金200について、販売先から現金で回収しました。
(残りの100は翌月回収の契約と仮定します)
仕訳は以下の通りです。

df_siwake = siwake.entry(df_siwake, 20200430,
                         [['現金', 200]],
                         [['売掛金', 200]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
10 6 20200430 現金 200 0
11 6 20200430 0 売掛金 200

販売代金を回収することで、会社の資産である現金借方)が増えますから、
借方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する

一方、販売先から代金をもらう債権である売掛金借方)も、回収完了と共になくなるため、
その反対である貸方に記載します。
ルール③:資産の増加は借方、負債と純資産の増加は貸方に記載する
ルール'③:資産の減少は貸方、負債と純資産の減少は借方に記載する

ここまで

  1. 会社を設立
  2. 商品の仕入
  3. 商品の販売
  4. 仕入代金の支払
  5. 販売代金の回収

という一連の仕訳を作成しました。仕訳帳 df_siwake の中身を確認してみます。

df_siwake
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
0 1 20200401 現金 1000 0
1 1 20200401 0 資本金 1000
2 2 20200402 商品 500 0
3 2 20200402 0 買掛金 500
4 3 20200403 売掛金 300 0
5 3 20200403 0 売上 300
6 4 20200403 売上原価 200 0
7 4 20200403 0 商品 200
8 5 20200420 買掛金 300 0
9 5 20200420 0 現金 300
10 6 20200430 現金 200 0
11 6 20200430 0 売掛金 200

2.残高試算表の作成

【やりたいこと】
仕訳帳から勘定科目ごとの借方金額、貸方金額を集計し、残高試算表(T/B:Trial Balance)を作成します。   

【コーディングの方針】
仕訳帳 df_siwake から残高試算表 df_TB が生成されるよう、
クラス TrialBalance 内に、関数 create_tb および勘定科目のDataFrame df_kamokuを定義します。

最終的に財務諸表を作成することを見据え、残高試算表の各勘定科目に以下の属性を与えるよう、
df_kamokuを定義します(注3)。

(注3)
実務上は仕訳帳や残高試算表などの他に、勘定科目マスタを用意し、科目コードや貸借区分などの属性を定義します。

仕訳の作成も科目コードで入力し、勘定科目名はマスタから引用するのが一般的です。

  • 科目コード:科目の表示順
  • 科目分類:資産、負債、純資産のいずれに属するか
  • BS/PL:B/Sの科目か、P/Lの科目か
  • 貸借区分:借方(資産)、貸方(負債、純資産)いずれに属するか
df_kamoku = pd.DataFrame({'貸借科目': ['現金', '売掛金', '商品', '買掛金', '資本金', '繰越利益剰余金', '売上', '売上原価', '当期利益'],
                          '科目コード': ['100', '110', '120', '200', '300', '310', '400', '500', '600'],
                          '科目分類': ['資産', '資産', '資産', '負債', '負債', '純資産', '収益', '費用', '利益'],
                          'BS/PL': ['BS', 'BS', 'BS', 'BS', 'BS', 'BS', 'PL', 'PL', 'PL'],
                          '貸借区分': [1, 1, 1, -1, -1, -1, -1, 1, 1]})

そして df_kamoku と、仕訳日記帳 df_siwake の金額を勘定科目ごとに集計したテーブル

df = df_siwake.groupby('貸借科目').sum()[['貸借金額']]

を結合します。

なおPandasでの集計の都合上、借方をプラス、貸方をマイナスと考えたほうが扱いやすいため(注4)、

  • 貸借金額:借方金額 - 貸方金額

とのカラムを作成します。

(注4)
「集計の都合上」と書きましたが、借方と貸方、プラスとマイナスとの概念は同値と言えます。
- プラスの反対がマイナス、マイナスの反対がプラス
- 借方の反対が貸方、貸方の反対が借方
- プラスの数とマイナスの数を足すとゼロになる
- 借方と貸方の金額は一致する
「借方」「貸方」という言葉の分かりにくさが、簿記を初めて学ぶ上での障害となりますが、
数学に親しい方は、プラスとマイナスの概念に置き換えれば理解しやすいかもしれません。

class TrialBalance:

    df_kamoku = pd.DataFrame({'貸借科目': ['現金', '売掛金', '商品', '買掛金', '資本金', '繰越利益剰余金', '売上', '売上原価', '当期利益'],
                              '科目コード': ['100', '110', '120', '200', '300', '310', '400', '500', '600'],
                              '科目分類': ['資産', '資産', '資産', '負債', '負債', '純資産', '収益', '費用', '利益'],
                              'BS/PL': ['BS', 'BS', 'BS', 'BS', 'BS', 'BS', 'PL', 'PL', 'PL'],
                              '貸借区分': [1, 1, 1, -1, -1, -1, -1, 1, 1]})

    def create_tb(self, df_siwake):
        df = df_siwake.copy()
        df['貸借科目'] = df['借方科目'] + df['貸方科目'] 
        df['貸借金額'] = df['借方金額'] - df['貸方金額'] 
        df = df.groupby('貸借科目').sum()[['貸借金額']]

        df_merge = pd.merge(df, self.df_kamoku, on='貸借科目').sort_values('科目コード').reset_index(drop=True)

        return df_merge

TrialBalanceクラスのインスタンスを生成し、関数 create_tb に仕訳日記帳 df_siwake を渡して
残高試算表 df_TB を作成します。

TB = TrialBalance()

df_TB = TB.create_tb(df_siwake)
df_TB
index 貸借科目 貸借金額 科目コード 科目分類 BS/PL 貸借区分
0 現金 900 100 資産 BS 1
1 売掛金 100 110 資産 BS 1
2 商品 300 120 資産 BS 1
3 買掛金 -200 200 負債 BS -1
4 資本金 -1000 300 負債 BS -1
5 売上 -300 400 収益 PL -1
6 売上原価 200 500 費用 PL 1

科目ごとの貸借金額(借方の場合+、貸方の場合-)が集計され、科目コードや科目分類などの属性が付与されています。

あとは残高試算表を、貸借対照表(B/S)、損益計算書(P/L)に分割するだけですが、
単純に分割しただけでは貸借対照表の借方金額、貸方金額が一致しないという問題が生じます。

確認のため、残高試算表のうち科目分類が「BS」であるものだけを取り出します。

df_TB[df_TB['BS/PL']=='BS']
index 貸借科目 貸借金額 科目コード 科目分類 BS/PL 貸借区分
0 現金 900 100 資産 BS 1
1 売掛金 100 110 資産 BS 1
2 商品 300 120 資産 BS 1
3 買掛金 -200 200 負債 BS -1
4 資本金 -1000 300 負債 BS -1

一見問題ないように見えますが、この貸借対照表(B/S)は
ルール②:借方、貸方それぞれの合計金額は必ず一致する
を満たしていません。

借方金額(貸借金額が+)、貸方金額(貸借金額が-)をそれぞれ集計してみます。

df_TB[(df_TB['BS/PL']=='BS') & (df_TB['貸借金額'] > 0)]
index 貸借科目 貸借金額 科目コード 科目分類 BS/PL 貸借区分
0 現金 900 100 資産 BS 1
1 売掛金 100 110 資産 BS 1
2 商品 300 120 資産 BS 1
df_TB[(df_TB['BS/PL']=='BS') & (df_TB['貸借金額'] < 0)]
index 貸借科目 貸借金額 科目コード 科目分類 BS/PL 貸借区分
3 買掛金 -200 200 負債 BS -1
4 資本金 -1000 300 負債 BS -1
print('借方金額合計:',df_TB[(df_TB['BS/PL']=='BS') & (df_TB['貸借金額'] > 0)].sum()['貸借金額'])
print('貸方金額合計:',df_TB[(df_TB['BS/PL']=='BS') & (df_TB['貸借金額'] < 0)].sum()['貸借金額'])
借方金額合計: 1300
貸方金額合計: -1200

貸借対照表(B/S)における借方金額の合計と、貸方金額の合計が一致していないことがわかります。

この貸借対照表(B/S)における差額は、当期の損益計算書(P/L)における収益と費用の差額、つまり当期利益に一致します。

収益と費用の差額から当期利益を算出する手続きを損益振替といいます。

また損益計算書(P/L)における当期利益を、貸借対照表(B/S)の繰越利益剰余金という勘定に振り替える手続きを資本振替といいます。

資本振替の仕訳は以下のとおりです。

df_siwake = siwake.entry(df_siwake, 20200430,
                         [['当期利益', 100]],
                         [['繰越利益剰余金', 100]])

df_siwake[df_siwake['仕訳番号']==siwake.siwake_no]
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
12 7 20200430 当期利益 100 0
13 7 20200430 0 繰越利益剰余金 100

なお当期利益は売上300と売上原価200の差額100です。

損益振替および資本振替を行った後の仕訳帳 df_siwake および残高試算表 df_TB を見てみます。

df_siwake
index 仕訳番号 日付 借方科目 借方金額 貸方科目 貸方金額
0 1 20200401 現金 1000 0
1 1 20200401 0 資本金 1000
2 2 20200402 商品 500 0
3 2 20200402 0 買掛金 500
4 3 20200403 売掛金 300 0
5 3 20200403 0 売上 300
6 4 20200403 売上原価 200 0
7 4 20200403 0 商品 200
8 5 20200420 買掛金 300 0
9 5 20200420 0 現金 300
10 6 20200430 現金 200 0
11 6 20200430 0 売掛金 200
12 7 20200430 当期利益 100 0
13 7 20200430 0 繰越利益剰余金 100
df_TB = TB.create_tb(df_siwake)
df_TB
index 貸借科目 貸借金額 科目コード 科目分類 BS/PL 貸借区分
0 現金 900 100 資産 BS 1
1 売掛金 200 110 資産 BS 1
2 商品 300 120 資産 BS 1
3 買掛金 -200 200 負債 BS -1
4 資本金 -1000 300 負債 BS -1
5 繰越利益剰余金 -1000 300 負債 BS -1
6 売上 -300 400 収益 PL -1
7 売上原価 200 500 費用 PL 1
8 当期利益 100 500 利益 PL 1
print('借方金額合計:',df_TB[(df_TB['BS/PL']=='BS') & (df_TB['貸借金額'] > 0)].sum()['貸借金額'])
print('貸方金額合計:',df_TB[(df_TB['BS/PL']=='BS') & (df_TB['貸借金額'] < 0)].sum()['貸借金額'])
借方金額合計: 1300
貸方金額合計: -1300

損益振替および資本振替を行うことで、貸借対照表(B/S)の借方金額および貸方金額が一致するこどが確認できました。

3. 貸借対照表(B/S)、損益計算書(P/L)の作成

【やりたいこと】
残高試算表を貸借対照表(B/S)、損益計算書(P/L)に分割します。

【コーディングの方針】
残高試算表 df_TBを「BS/PL」列が'BS'か、'PL'かにより分割します。

なお「貸借金額」に「貸借区分」(+1、-1)を乗じることで、借方(+)貸方(-)それぞれの金額を絶対値に戻し、
「表示貸借」という列に格納します。

BS = df_TB[df_TB['BS/PL']=='BS'].reset_index(drop=True)
BS['表示貸借'] = BS['貸借金額'] * BS['貸借区分']
BS[['科目コード','科目分類', '貸借科目', '表示貸借']]
index 科目コード 科目分類 貸借科目 表示貸借
0 100 資産 現金 900
1 110 資産 売掛金 100
2 120 資産 商品 300
3 200 負債 買掛金 200
4 300 負債 資本金 1000
5 310 純資産 繰越利益剰余金 100
PL = df_TB[df_TB['BS/PL']=='PL'].reset_index(drop=True)
PL['表示貸借'] = PL['貸借金額'] * PL['貸借区分']
PL[['科目コード','科目分類', '貸借科目', '表示貸借']]
index 科目コード 科目分類 貸借科目 表示貸借
0 400 収益 売上 300
1 500 費用 売上原価 200
2 600 利益 当期利益 100

貸借対照表(B/S)、損益計算書(P/L)が作成できました。
改めてイメージ図を表すと以下のとおりです。

image.png
image.png

もう一度振り返りますと、

1. 仕訳の作成

  取引を仕訳として仕訳帳に記録する。

2. 残高試算表(T/B)の作成

  仕訳帳から勘定科目ごとの借方金額、貸方金額を集計し、残高試算表を作成する。

3. 貸借対照表(B/S)、損益計算書(P/L)の作成

  残高試算表を科目により貸借対照表(B/S)損益計算書(P/L)に分割する。

との流れで、日々の取引を集計して財務諸表を作成することが、簿記一巡の手続きです。

補足:仕訳作成時のエラーチェック

仕訳入力用のクラス Siwake に②データ型チェック、③貸借一致チェックの関数を定義し、

  • 仕訳のデータ型が正しいか、入力時に確認する …②
  • 仕訳の貸借が一致しているか、入力時に確認する …③

ように実装します。

class Siwake:
    def __init__(self):
        self.siwake_no = 0

    def entry(self, df, date, kari, kashi):             # ...①複合仕訳に対応
            if self.check_keishiki(date, kari, kashi):  # ...②データ型チェック
                if self.check_taisyaku(kari, kashi):    # ...③貸借一致チェック
                    self.siwake_no += 1                 # ...仕訳番号を更新

                    for i in range(len(kari)): #複合仕訳に対応するため[借方科目、借方金額]の数だけループを回す

                        # 仕訳番号、日付、借方科目、借方金額、貸方科目、貸方金額をSereis化し、DataFrameに格納
                        # なお貸方科目は""、貸方金額は0とする
                        kari_entry = pd.Series([self.siwake_no] + [date] + kari[i] + ["", 0], index=df.columns)          
                        df = df.append(kari_entry, ignore_index=True)

                    for i in range(len(kashi)): #複合仕訳に対応するため[貸方科目、貸方金額]の数だけループを回す

                        # 仕訳番号、日付、借方科目、借方金額、貸方科目、貸方金額をSereis化し、DataFrameに格納
                        # なお借方科目は""、借方金額は0とする
                        kashi_entry = pd.Series([self.siwake_no] + [date] + ["", 0] + kashi[i], index=df.columns)
                        df = df.append(kashi_entry, ignore_index=True)

                    return df

                else:
                    print("エラー:貸借金額を一致させてください")
                    return df

            else:
                pass


    def check_keishiki(self, date, kari, kashi):# ...②データ型チェック
        for i , k in zip(range(len(kari)),range(len(kashi))):
            if type(date) != int or len(str(date)) != 8:
                print("エラー:日付は8桁の整数yyyymmddで入力してください")
                return False                       

            elif len(kari[i]) != 2:
                print("エラー:借方科目、借方金額のみ入力してください")
                return False                

            elif type(kari[i][0]) != str or type(kari[i][1]) != int:
                print("エラー:データ型は 借方科目→文字列、借方金額→数値 としてください")
                return False

            elif len(kashi[k]) != 2:
                print("エラー:貸方科目、貸方金額のみ入力してください")
                return False                

            elif type(kashi[k][0]) != str or type(kashi[k][1]) != int:
                print("エラー:データ型は 貸方科目→文字列、貸方金額→数値 としてください")
                return False

            else:
                return True                    

    def check_taisyaku(self, kari, kashi):# ...③貸借一致チェック
        kari_sum = 0
        kashi_sum = 0

        for i in range(len(kari)):
                kari_sum += kari[i][1]

        for i in range(len(kashi)):
                kashi_sum += kashi[i][1]       

        if kari_sum != kashi_sum:
            return False

        else:
            return True

Siwakeクラスのインスタンスを生成し、不正な仕訳を作成していきます。

siwake = Siwake()

「日付」に不正な値を入れてみます。

df_siwake = siwake.entry(df_siwake, 1201,
             [['現金', 500]],
             [['資本金', 1000]])
エラー:日付は8桁の整数yyyymmddで入力してください

貸借金額が一致していない仕訳を入れてみます。

df_siwake = siwake.entry(df_siwake, 20200401,
             [['現金', 500]],
             [['資本金', 1000]])
エラー:貸借金額を一致させてください

余計な項目の入った仕訳を入れてみます。

df_siwake = siwake.entry(df_siwake, 20200401,
             [['現金', 500, '業者A']],
             [['資本金', 1000, '業者A']])
エラー:借方科目、借方金額のみ入力してください

データ型が不正な仕訳を入れてみます。

df_siwake = siwake.entry(df_siwake, 20200401,
             [['現金', 500]],
             [['資本金', '1000']])
エラー:データ型は 貸方科目→文字列、貸方金額→数値 としてください

実務上は入力値が空でないかのチェック、勘定科目が科目マスタにあるかのチェックなど、
様々なチェック項目が考えられますが、本稿では最低限のチェックのみ実施しています。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【AWS】Lambdaの実行ログから指定した文字列を検知してslackに通知

今回は自分が触れる機会の多いAWSのサービスを使用して運用の際などで利用できる通知方法の一つをご紹介します。

使用サービス

  • Lambda
  • CloudWatch
  • Slack

実装フロー

1.Lambda関数からCloudWatch Logsへログを出力
2.CloudWatch Logsから指定した文字列を検知
3.検知された文字列をトリガーにSlack通知用のLambda関数を発火
4.通知用LambdaでSlackへ文字列を通知

フロー詳細

実際に上記のフローで実装していきます。

手順1

今回は「test」という文字列でCloudWatch Logsへ出力します。
スクリーンショット 2020-12-23 16.14.07.png

手順2

文字列検知にCloudWatch Logsのサブスクリプションフィルター(Lambda)を使用していきます。
スクリーンショット 2020-12-23 16.25.49.png

手順3

通知用Lambda関数とログ形式およびフィルターパターンを設定します
スクリーンショット 2020-12-23 16.35.33.png

手順4

環境変数「WebhookURL」には通知したいslackのWebhook URLを設定しています。

notifi.py
import json
import os
import urllib.request
import slackweb

def lambda_handler(event, context):
    decoded_data = zlib.decompress(
        base64.b64decode(event['awslogs']['data']),
        16+zlib.MAX_WBITS
    )
    json_data = json.loads(decoded_data) 
    print(json_data['logEvents'])
    for i in json_data['logEvents']:
        test = i['message'] # ログの内容
        slack = slackweb.Slack(url=os.environ['WebhookURL'])
        slack.notify(text=test)

先ほど設定した文字列を検知してslackに通知されることを確認しました。
スクリーンショット 2020-12-23 16.48.51.png

まとめ

上記以外にも通知方法は複数ありますが、なるべく簡潔にまとめていった方が管理しやすいので簡略化できるところはどんどん楽にしていきたいですね。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonでYouTube API を叩いてみる

この記事は福島高専 Advent Calendar 2020 25日目の記事です。
本記事はAPIについての記事です。内容の悪用は厳禁でお願いします。

はじめに

昨今、APIの重要性について述べられている記事や動画などをちらほらと見かけるようになってきたので、自分もPythonの勉強がてら触ってみることにしました。まだまだ初学者なのでスマートなコードを書けませんが、よろしくお願いします。

概要

取り敢えず、比較的メジャーだと思われるYouTube Data APIを叩いてYouTubeチャンネルや動画のデータを取得して、解析してみたいと思います。下記の事前準備は既に済んでいるという前提です。

事前準備

  • YouTube Data API v3のAPIキーを取得しておく
  • Pythonをインストールし開発環境を整える

テスト実装

取り敢えず、APIの使い方を軽く理解するためにいくつかメソッドを利用してみます。因みにAPIのQuotas(1日の使用量)は10000が上限です。テストを繰り返していると案外あっさり到達する可能性があるので注意してください。

必要なパッケージのインストール

まず、PythonでYouTube Data APIを使用するためのパッケージをインストールします。

パッケージのインストール
$ pip install google-api-python-client

検索クエリを含むYouTubeチャンネルを取得する

手始めに検索ワードを含むYouTubeチャンネルを取得する処理を実装します。

getChannel.py
from apiclient.discovery import build

API_KEY = '<API_KEY>' #取得したAPIキー
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'

youtube = build(
    YOUTUBE_API_SERVICE_NAME,
    YOUTUBE_API_VERSION,
    developerKey = API_KEY
)

SEARCH_QUELY = input('Query >> ')
response = youtube.search().list(
    q=SEARCH_QUELY, 
    part='id,snippet', 
    maxResults=10,
    type='channel').execute()

for item in response.get('items', []):
    print(item['snippet']['title'])

スクリプトを実行してキーワードを入力すると該当するチャンネル10個が一覧で出力されます。<API_KEY>には各自取得したAPIキーを入れてください。

response = youtube.search().list(
    q=SEARCH_QUELY, 
    part='id,snippet', 
    maxResults=10,
    type='channel').execute()

肝となるのはここでしょうか。search().list()メソッドの引数に各パラメータを与えることで取得したい情報を設定できます。パラメータの設定によって、チャンネルだけでなく動画や再生リストも取得できるようです。

for item in response.get('items', []):
    print(item['snippet']['title'])

データはjson形式で返ってくるため、getで必要な情報を抜き出します。パラメータや返り値の詳細な形式はYouTube Data API Referenceで確認してください。

指定したチャンネルの動画データを取得する

特定のチャンネルのIDを指定することでそのチャンネルの動画情報を取得できます。

getVideos.py
from apiclient.discovery import build

API_KEY = '<API key>'
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
CHANNEL_ID = '<Channel ID>'

youtube = build(
    YOUTUBE_API_SERVICE_NAME,
    YOUTUBE_API_VERSION,
    developerKey=API_KEY
)

response = youtube.search().list(
    part = "snippet",
    channelId = CHANNEL_ID,
    maxResults = 5,
    order = "date",
    type ='video'
    ).execute()

for item in response.get("items", []):
    print(item['snippet']['title'])

先ほどのコードを多少改変しただけです。search().list()メソッドのパラメータが増えていますね。channelIdを指定すると該当するチャンネルの動画情報を最大値maxResultsまで取得することができます。orderでレスポンスのソート方法を指定できます。dateは日付順ですね。

動画のコメントを取得する

特定の動画のコメントを取得する処理です。

getComments.py
import json
import requests
from apiclient.discovery import build

URL = 'https://www.googleapis.com/youtube/v3/'
API_KEY = '<API_KEY>'
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
VIDEO_ID = '<Video ID>'

params = {
    'key': API_KEY,
    'part': 'snippet',
    'videoId': VIDEO_ID,
    'order': 'relevance',
    'textFormat': 'plaintext',
    'maxResults': 100,
}

response = requests.get(URL + 'commentThreads', params=params)
resource = response.json()

for item in resource['items']:
    name = item['snippet']['topLevelComment']['snippet']['authorDisplayName']
    like_cnt = item['snippet']['topLevelComment']['snippet']['likeCount']
    text = item['snippet']['topLevelComment']['snippet']['textDisplay']
    print('ユーザ名: {}\n{}\nグッド数: {}\n'.format(name, text, like_cnt))

チャンネルと同様にIDを指定すると特定の動画のコメントを取得できます。

response = requests.get(URL + 'commentThreads', params=params)
resource = response.json()

URLに指定したパラメータを繋いでrequestしています。

for item in resource['items']:
    name = item['snippet']['topLevelComment']['snippet']['authorDisplayName']
    like_cnt = item['snippet']['topLevelComment']['snippet']['likeCount']
    text = item['snippet']['topLevelComment']['snippet']['textDisplay']
    print('ユーザ名: {}\n{}\nグッド数: {}\n'.format(name, text, like_cnt))

例の如く、レスポンスはjsonなので必要な情報を抜き出しています。今回はコメントしたユーザ名、本文、グッド数を取得していますが、他にも返信数や子コメントなども取得可能です。

データ解析

APIの使い方がなんとなく掴めたところで軽いデータ解析を行ってみます。データ解析といっても上記のコードを元に特定の動画のコメントを取得してCSV出力するだけの簡単なものです。具体的には

  • 検索ワードを入力して関連するチャンネルを取得する
  • チャンネルを指定して動画を取得する
  • 動画を指定してコメントを取得する
  • CSVに書き出す

といった処理を実装してみます。youtube_api.pyで作成します。

チャンネル取得

検索ワードを入力して関連するチャンネルタイトルをナンバリングしてを一覧で表示させます。

youtube_api.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import requests
import pandas as pd
from apiclient.discovery import build

URL = 'https://www.googleapis.com/youtube/v3/'
API_KEY = '<API_KEY>'
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
SEARCH_QUELY =''

youtube = build(
    YOUTUBE_API_SERVICE_NAME,
    YOUTUBE_API_VERSION,
    developerKey = API_KEY
)

def getChannel():
    channel_list = []
    num = 0
    search_res = youtube.search().list(
        q=SEARCH_QUELY, 
        part='id,snippet', 
        maxResults=10,
        type='channel',
        order='rating'
    ).execute()

    for item in search_res.get('items', []):
        num += 1
        channel_dict = {'num':str(num),'type':item['id']['kind'],'title':item['snippet']['title'],'channelId':item['snippet']['channelId']}
        channel_list.append(channel_dict)

    print('***Channel list***')
    for data in channel_list:
        print("Channel " + data["num"] + " : " + data["title"])
    print('******************')

    return getId(input('Channel Number>> '),channel_list)

軽く解説です。search()のパラメータを関連チャンネル10個をリソースの評価が高い順に取得するように設定しています。
リソースのタイトルとChannelIDをdictionary型に格納しています。numは一覧から特定のチャンネルを指定するための番号です。dictionaryをさらにlistに格納します。選択したいチャンネルの番号を入力するとChannelIDを返します。

動画取得

続いて指定したChannelIDから動画を取得して表示させるコードを追加します。

youtube_api.py
def getVideos(_channelId):
    video_list = []
    num = 0
    video_res = youtube.search().list(
        part = 'snippet',
        channelId = _channelId,
        maxResults = 100,
        type = 'video',
        order = 'date'
    ).execute()

    for item in video_res.get("items",[]):
        num += 1
        video_dict = {'num':str(num),'type':item['id']['kind'],'title':item['snippet']['title'],'videoId':item['id']['videoId']}
        video_list.append(video_dict)

    print('***Video list***')
    for data in video_list:
        print("Video " + data["num"] + " : " + data["title"])
    print('****************')

    return getId(input('Video Number>> '),video_list)

同じようなことをやっているだけなので説明は省略。

コメント取得

さらに動画からコメントを取得するコードを追加します。

youtube_api.py
def getComments(_videoId):
    global API_KEY
    comment_list = []
    params = {
        'key': API_KEY,
        'part': 'snippet',
        'videoId': _videoId,
        'order': 'relevance',
        'textFormat': 'plaintext',
        'maxResults': 100,
    }

    response = requests.get(URL + 'commentThreads', params=params)
    resource = response.json()

    for item in resource['items']:
        text = item['snippet']['topLevelComment']['snippet']['textDisplay']
        comment_list.append([item['snippet']['topLevelComment']['snippet']['authorDisplayName'],
                             item['snippet']['topLevelComment']['snippet']['likeCount'],
                             item['snippet']['topLevelComment']['snippet']['textDisplay']])
    return comment_list

VideoIDで指定した動画からコメント100件のユーザ名、本文、グッド数をlistに格納しています。

CSV出力

DataFrameに格納して出力しているだけです。

youtube_api.py
def dataList(_comment_list):
    if(_comment_list != []):
        param=['User name', 'Like count', 'text']
        df = pd.DataFrame(data = _comment_list,columns=param)
        df.to_csv("comments.csv")
        print('Output csv')
    else:
        print('None comment')

全体コード

youtube_api.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import requests
import pandas as pd
from apiclient.discovery import build

URL = 'https://www.googleapis.com/youtube/v3/'
API_KEY = '<API_KEY>'
YOUTUBE_API_SERVICE_NAME = 'youtube'
YOUTUBE_API_VERSION = 'v3'
SEARCH_QUELY =''

youtube = build(
    YOUTUBE_API_SERVICE_NAME,
    YOUTUBE_API_VERSION,
    developerKey = API_KEY
)

def run():
    global SEARCH_QUELY
    SEARCH_QUELY = input('検索ワード>> ')
    dataList(getComments(getVideos(getChannel())))

def getId(_num,_items):
    for data in _items:
        if data['num'] == _num:
            if data['type'] == 'youtube#channel':
                return data['channelId']
            else:
                return data['videoId']
    return ''

def getChannel():
    channel_list = []
    num = 0
    search_res = youtube.search().list(
        q=SEARCH_QUELY, 
        part='id,snippet', 
        maxResults=10,
        type='channel',
        order='rating'
    ).execute()

    for item in search_res.get('items', []):
        num += 1
        channel_dict = {'num':str(num),'type':item['id']['kind'],'title':item['snippet']['title'],'channelId':item['snippet']['channelId']}
        channel_list.append(channel_dict)

    print('***Channel list***')
    for data in channel_list:
        print("Channel " + data["num"] + " : " + data["title"])
    print('******************')

    return getId(input('Channel Number>> '),channel_list)

def getVideos(_channelId):
    video_list = []
    num = 0
    video_res = youtube.search().list(
        part = 'snippet',
        channelId = _channelId,
        maxResults = 100,
        type = 'video',
        order = 'date'
    ).execute()

    for item in video_res.get("items",[]):
        num += 1
        video_dict = {'num':str(num),'type':item['id']['kind'],'title':item['snippet']['title'],'videoId':item['id']['videoId']}
        video_list.append(video_dict)

    print('***Video list***')
    for data in video_list:
        print("Video " + data["num"] + " : " + data["title"])
    print('****************')

    return getId(input('Video Number>> '),video_list)

def getComments(_videoId):
    global API_KEY
    comment_list = []
    params = {
        'key': API_KEY,
        'part': 'snippet',
        'videoId': _videoId,
        'order': 'relevance',
        'textFormat': 'plaintext',
        'maxResults': 100,
    }

    response = requests.get(URL + 'commentThreads', params=params)
    resource = response.json()

    for item in resource['items']:
        text = item['snippet']['topLevelComment']['snippet']['textDisplay']
        comment_list.append([item['snippet']['topLevelComment']['snippet']['authorDisplayName'],
                             item['snippet']['topLevelComment']['snippet']['likeCount'],
                             item['snippet']['topLevelComment']['snippet']['textDisplay']])
    return comment_list

def dataList(_comment_list):
    if(_comment_list != []):
        param=['User name', 'Like count', 'text']
        df = pd.DataFrame(data = _comment_list,columns=param)
        df.to_csv("comments.csv")
        print('Output csv')
    else:
        print('None comment')

#実行
run()

実行

では早速動かしてみましょう。youtube_api.pyを実行してください。適当なワードを入力してみます。

execute1
チャンネル番号を指定。↓
execute2.png
動画番号を指定。↓
execute3.png
無事CSVに出力できたら成功です。お疲れ様でした。
今回はコメントを抜き出すだけの簡単な処理でしたが、チャンネルや動画のデータをグラフ化したりしてみても面白そうですね。また、別のAPIを使えばコメント文を感情分析してアンチコメントを割り出すみたいなこともできるようです。
今回使ったコードはGitHubにあるのでそちらを参照してください。

編集後記

本当は別の内容を書きたかったのですが、時間の都合上でただAPIを叩いただけの薄い内容になってしまいました。ただAPIを利用するスキルは磨いておいて損はないのかなと思ったりしています。
本記事の内容に改善点やアドバイスなどがありましたら、よろしくお願いします。

参考文献

GitHub

https://github.com/Milkly-D/youtube_API.git

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Opencvについて①

Opencv3系についてのメモ

基本的には公式ドキュメントを確認しながらの物。

環境

windows10 home 64bit
Python 3.7.9
opencv 3.4.2.17
Jupyter Notebook使用(Anaconda不使用)


使用画像について

画像については自分で撮影した肉を使用。
形式としては.pngを使用。
.jpgでも問題なし

opencv_test.ipynb
cap_dir = "使用画像を保存したフォルダを指定"

以後はすべてこの変数を使用しています。

①画像読み込みについて

1)cv2.imread(第1引数、第2引数)

opencv_test.ipynb
img1 = cv2.imread(cap_dir,0)
img2 = cv2.imread(cap_dir,1)
img3 = cv2.imread(cap_dir,-1)
img4 = cv2.imread(cap_dir,cv2.IMREAD_GRAYSCALE)
img5 = cv2.imread(cap_dir,cv2.IMREAD_COLOR)
img6 = cv2.imread(cap_dir,cv2.IMREAD_UNCHANGED)

第1引数:読み込みたい画像
第2引数:画像に関するフラグ

普通に使用するだけなら0 or 1で問題ないと思います。
読み込み後の処理でどうするかなのでカラーで使うかグレーで使うかでいいと思います。

むしろアルファチャンネルなる物は初めて見ました。
画像みてもわかりません。

スクリーンショット (333).pngスクリーンショット (334).pngスクリーンショット (335).png


②画像表示について

1)cv2.imshow(第1引数、第2引数)
2)cv2.waitKey(第1引数)
3)cv2.destroyAllWindows(第1引数)
4)cv2.namedWindow(第1引数、第2引数)

opencv_test.ipynb
img = cv2.imread(cap_dir,1)

cv2.namedWindow('niku',cv2.WINDOW_NORMAL) 
cv2.imshow('niku',img)
cv2.waitKey(0)
cv2.destroyAllWindows()

1)cv2.imshow(第1引数、第2引数)
第1引数:表示ウインドウ名(文字型で入力)
第2引数:表示したい画像

2)cv2.waitKey(第1引数)
第1引数:キーボード入力待ち時間(ms単位)
ウエイトと思っていたが、公式曰く入力受付時間らしい
0にしておけば無制限入力待ちとなる。

3)cv2.destroyAllWindows(第1引数)
第1引数:未入力なら開いているすべてのウインドウを閉じる。
複数開いているときに閉じたいウインドウ名を入力すると
入力した名前のウインドウを閉じる

4)cv2.namedWindow(第1引数、第2引数)
第1引数:ウインドウ名(文字型)
第2引数:デフォルトでcv2.WINDOW_AUTOSIZEが設定されている。
通常開いた画像はウインドウの大きさは固定だが
第2引数にcv2.WINDOW_NORMALと入力すると
マウス操作で大きさを任意で変更出来る様になる。

ただ注意として、第1引数をimshowの第1引数で設定した名前と違う名前にすると
画像のない別ウインドウが立ち上がって意味がなくなる。

下記は名前が一致しない場合
ウインドウの大きさがいじれるのは左側
スクリーンショット (338).png


③画像保存について

1)cv2.imwrite(第1引数、第2引数)

opencv_test.ipynb
cv2.imwrite('test.png',img)

第1引数:保存画像名
第2引数:保存画像

これで画像を保存するとスクリプトがある場所と同じ所に画像が保存される。

opencv_test.ipynb
save_dir = "画像を保存したいディレクトリ"
save_name = "画像名"
save_ext = ".png"

cv2.imwrite((save_dir + save_name + save_ext),img)

とすれば指定した場所に画像を保存する事が出来ます。

多分、上記の様な使い方がデフォになると思います。


④まとめ

他の物も順次まとめていく予定です。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

pandasのdataframeの要素を通常のstring型へ変換

res = df.loc[ <インデックス名>, <カラム名> ]

とすればいいです。

df.loc[ <インデックス名>, [ <カラム名> ] ]とするとできないので注意。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonでGoogleDriveAPIを使ってGoogle Driveにファイルを定期的にアップロードする

背景

pythonでgoogle driveにファイルを定期的にアップロードするジョブのスクリプトが欲しい。
検索すると、PyDriveを使った記事が多いが、PyDriveはメンテナンスがあまりされていないし、Google Drive APIのv2を使っていて今使うのは得策ではなさそう。
今回は GoogleDriveAPIを使って実装してみる。

参考リンク集

Python Quickstart

  • GoogleDriveAPIの雰囲気が掴める掴む
  • この例では一度コンソールでログインする必要がある
    • 今回の仕様としては、コンソールログインなどのユーザーアクションなしで定期アップロードを行うようにしたいので別の認証を考える必要がある

Upload file data

  • ファイルアップロードのサンプル
    • Pythonのサンプルが少なくてしっくりこない

Lambda(Python)でGoogle Driveへファイルアップロード

  • GCPのサービスアカウントを使用したサンプルが紹介されている
  • このqiita記事が一番参考になった

実装サンプル

sample.py
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from oauth2client.service_account import ServiceAccountCredentials
import os

def uploadFileToGoogleDrive(fileName, localFilePath):
    service = getGoogleService()
    # "parents": ["****"]この部分はGoogle Driveに作成したフォルダのURLの後ろ側の文字列に置き換えてください。
    file_metadata = {"name": fileName, "mimeType": "text/csv", "parents": ["****"] }
    media = MediaFileUpload(localFilePath, mimetype="text/csv", resumable=True)
    file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()

def getGoogleService():
    scope = ['https://www.googleapis.com/auth/drive.file']
    keyFile = 'credentials.json'
    credentials = ServiceAccountCredentials.from_json_keyfile_name(keyFile, scopes=scope)
    return build("drive", "v3", credentials=credentials, cache_discovery=False)


getGoogleService()
uploadFileToGoogleDrive("hoge", "hige.csv")
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

GitHub Actionsでprivate repositoryのPythonライブラリを含むDockerイメージをビルドする

背景・目的

Github ActionsでPythonのイメージをビルドするとき、別のprivate repositoryにある社内用ライブラリを持ってくる方法についてまとめました。

既に同僚の @elyunim26 の「GitHub Actionsでビルドするコンテナ内でGitHubのprivate repositoryをセキュアに参照する」という記事があったのですが、私はマシンユーザーではなくデプロイキーを利用していたため、安全にssh認証をする方法が必要でした。Build-time secretsを使うことでsshキーも安全にイメージのビルドに利用できます。

実装方法

簡単に説明すると、以下のような手順で実現できます。

  • Github Actionsでsecretsからデプロイキー(秘密鍵)を読み込む
  • DockerのBuildkitの機能を使ってセキュアに受け渡す
  • 通常通りpip installを行う

デプロイキーの設定

まず、ライブラリ側のリポジトリにデプロイキーを設定する必要があります。デプロイキーの設定方法はGithubの公式ドキュメントを参照してください。

Github Actionsの設定

次のような要素に気をつける必要があります。トラブルシューティングには(Pythonとnode.jsという違いはありますが)GitHub Actions で private repository の node module をインストールするが役に立ちました。

  • GITHUBで始まるsecretsは禁止されている
  • secretsでは改行が使えないため、事前に改行を\nなどに置換した形で登録して、sedで置換するなどの回避方法が必要

以下は設定例です。我々のチームではAWSを利用しているため、後半にECRにpushするコードも含まれています。

github_actions_ecr.yml
      - name: Build, tag, and push image to Amazon ECR
        id: build-image
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          ECR_REPOSITORY: ${{ steps.extract_repository.outputs.repository }}
          IMAGE_TAG: ${{ github.sha }}
          DEPLOY_KEY_GITHUB: ${{ secrets.DEPLOY_KEY_GITHUB }}
        run: |
          echo ${DEPLOY_KEY_GITHUB} > .deploy_key
          sed -i -e "s#\\\\n#\n#g" .deploy_key
          chmod 600 .deploy_key
          DOCKER_BUILDKIT=1 docker build --secret id=ssh,src=.deploy_key \
            -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG \
            -t $ECR_REGISTRY/$ECR_REPOSITORY:latest \
            .
          rm .deploy_key
          docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
          docker push $ECR_REGISTRY/$ECR_REPOSITORY:latest

Dockerfileの設定

我々のチームではpoetryを使うことが多いのでpyproject.tomlを利用していますが、requirements.txtを使う場合でも同様の設定でいけるはずです。

Dockerfile
# syntax=docker/dockerfile:experimental
FROM python:3.7

# ssh-keyscanの実行のため、元にするイメージによってはopenssh-clientのインストールが必要
RUN apt-get install -y openssh-client git

COPY ./pyproject.toml /app/pyproject.toml
COPY ./src /app/src
WORKDIR /app

RUN mkdir -m 700 $HOME/.ssh
RUN ssh-keyscan -H github.com > $HOME/.ssh/known_hosts
RUN --mount=type=secret,id=ssh,dst=$HOME/.ssh/id_rsa \
  pip install --upgrade pip && \
  pip install . && rm pyproject.toml

pyproject.tomlでは次のように記述されています。これはpoetryの公式ドキュメントにもある通り、poetry addコマンドでリポジトリを指定することで作成されるものです。

pyproject.toml
[tool.poetry.dependencies]
{ライブラリ名} = {git = "ssh://git@github.com/{ユーザー名}/{ライブラリ名}.git", rev = "main"}

参考にした記事

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ロングポーリングからの解放!DjangoのChannelsをしばく

挨拶

書いてある内容は公式チュートリアルと同じです。公式ドキュメント以外にDjangoのChannelsを1から10まで分かりやすく説明している親切な記事がなく、どう学習したらいいのか困ったので後の走者のために書き記しておきます。

「公式ドキュメント読んでるよね?」みたいな記事しかないのもきついですし。
公式ドキュメントが前提ならそっち読むわ!みたいなね

あと記事が古くてうまく動かない、っていうのもあると思うので。

一番いい勉強法はChannel公式ドキュメントのチュートリアルから始めることですが、まぁこの記事を見つけたなら始めなくて良いです。

想定読者は「Djangoちょっと使えるけどChannelsに手が出せない」みたいな方です。

あと、先に言っておくと僕自身Djangoに詳しくないです。
とりあえず動くWebsocketアプリが作れるまでを紹介します。

Channelsの概念

Channelsは今までのDjangoにWebsocket通信を追加します。
HTTP通信はそのままに、websocket通信が来たときのみにviewsではなくconsumerを呼び出します。

次にchannelとgroup、layerという概念があります(ドキュメントとの整合性のため英語表記します)。
channel: クライアントのメールボックスみたいなもの。これに対してメッセージを送れば結果的にクライアントに届くことになる。
group: channelの集まり。チャットの部屋みたいなもの。channelsはgroupで変化があったらgroup全体に通知する、みたいな仕組みを基本として想定しているんだと思う。
layer: groupのさらなる上位概念。何のためにあるかいまいちわからない。基本的に1個で足りるらしい。

これらを前提知識として実装していきます。別に覚えなくてもニュアンスで分かりますし、分からなくても問題ないです。

準備

chanellsをインストールしましょう
python -m pip install -U channels

つぎにプロジェクトを作ります。プロジェクト名を変えた場合は読み替えてください
django-admin startproject mysite

つぎにプロジェクト内に移動して
python3 manage.py startapp chat
これもアプリ名を変えた場合は読み替えてください

まぁここまではおまじないです。最初に形だけ作っとく、みたいなもんだと思ってください。
次にchatの中身をふっとばします。以下の形になるようにファイルを消したり作ったりしてください。

chat/
    __init__.py
    consumers.py
    routing.py
    urls.py
    views.py

ファイルを作るのはマジでいつもの感じで新しいファイルを作れば大丈夫です。

結果的にプロジェクト全体はこんな感じになると思います。

chat/
    migrations
    __init__.py
    consumers.py
    routing.py
    urls.py
    views.py
mysite/
    __init__.py
    asgi.py
    settings.py
    urls.py
    wsgi.py
db.sqlite3
manage.py

これで形だけは完成です。
asgi.pyがない場合はdjangoのバージョンが間違っている場合があります。Channelsが使えるバージョンを用意してください。

次に各ファイルをいじっていきます。

mystite/settings.py

mysite/settings.py
INSTALLED_APPS = [
    'channels',#追加
    'chat',#追加
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
]

#一番下に追加
ASGI_APPLICATION = 'mysite.asgi.application'

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels.layers.InMemoryChannelLayer"#インメモリを使う場合
    },
}

INSTALLED_APPS内に'channels'と'アプリ名'を追加します。まぁこれはDjangoの仕様です。
一番下に追加したのは変更の分かりやすさのためです。
レイヤーとはグループが集団のことで、公式ドキュメントによると1アプリ1レイヤーが普通らしいです。
"default"はレイヤーの名前です。

現在はインメモリを使うことにしていますがこれだと同じプロセス内でしか動作しないらしいです。
なのでRedisというキャッシュサーバーを使うことで解消できます。Redisとは共用メモリみたいなもんだと思います。

#推奨 redisというキャッシュサーバーを用いた方が良い,
CHANNEL_LAYERS = {
    'default': {
        'BACKEND':'channels_redis.core.RedisChannelLayer', 
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

この場合はポート番号6379にあるRedisサーバーをキャッシュサーバーとして使っています。

mysite/urls.py

以下のようにします。

mysite/urls.py
from django.conf.urls import include
from django.urls import path
from django.contrib import admin

urlpatterns = [
    path('chat/', include('chat.urls')),
    path('admin/', admin.site.urls),
]

includeは別のURL書いてあるファイルを使うよ~ってことです。

chat/urls.py

chat/urls.py
from django.urls import path

from . import views

urlpatterns = [
    path('', views.index, name='index'),
    path('<str:room_name>/', views.room, name='room'),
]

これに関してはDjangoと全く一緒です。
urlになんもない場合はviews.pyのindexを、なんかある場合はview.pyのroomを呼び出しています。その際にroom_nameとして取り出していて、views.py内ではそれを使っています。

chat/routing.py

chat/routing.py
from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

次にrouting.pyの内容です。routing.pyはwebsocketの接続について書きます。
re_pathはURLRouterの制限のためと書いてありましたがよくわかりません。まぁこう書けばいいんでしょう。
as_asgi()は後に用意するcomsumerクラスを使うための呼び出し方です。comsumerクラスを用意してこの呼び出し方をすれば後は勝手にやってくれるっぽいです。comsumerクラスの書き方は後で紹介します。

mysite/asgi.py

mysite/asgi.py
import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import chat.routing

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

application = ProtocolTypeRouter({
  "http": get_asgi_application(),#http://ならこっち
  "websocket": AuthMiddlewareStack(#ws:// wss://ならこっち
        URLRouter(
            chat.routing.websocket_urlpatterns#chat/routing.py
        )
    ),
})

これはHTTP接続かWebsocket通信かを判断して振り分けます。HTTPならchat/urlsに飛んでそこではviewsなどを使ってHTTPで返します。
Websocketならばrouting.pyに飛んで処理をします。

chat/consumers.py

これがwebsocketの処理になります。とりあえずまずはコピーして、下の解説を見ながら読んでください。

chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

#asyncをつけることでパフォーマンスが上がる
#websocketConsumerからAsyncWebsocketConsumerになる

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']#scopeは接続の情報を持つ、
        self.room_group_name = 'chat_%s' % self.room_name#roomのstrを作ってる

        # Join room group
        await self.channel_layer.group_add(
        #グループ参加処理、まぁこういう書き方するんだよ、ぐらい
        #ChatComsumerは同期だがchannel_layerは非同期
            self.room_group_name,
            self.channel_name
        )

        await self.accept()#websocketをacceptする,acceptしない場合rejectされる

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(#退出処理
            self.room_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(#グループにメッセージを送る
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # Receive message from room group
    async def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

まぁちょっと長いです。

もともとasyncではないのですが、asyncで実装することでスレッド呼び出しがなくなってパフォーマンスがあがる(?)らしいです。従ってawaitやasyncがついています。

connect、disconnect、receiveはchat/routing.pyにてas_asgi()として呼び出すための関数です。このように書いておけば後は勝手にやってくれるっぽいですね。connectはurlを入力して接続したとき、disconnectはcloseしたとき、receiveはsendしたときに実行されるっぽいです。

sendのtypeに特定の関数を設定しておくことで誰かがsendした際にその関数が実行され、それが結果的にサーバーからクライアントにメッセージを送ることになります。今回はchat_messageがそれに該当して結果的にグループに所属する全クライアントにsendを行っていますね。

動かす

動かす際にはここにあるindex.htmlやここにあるroom.htmlを参考にchat/Templates/chat/内にHTMLを書いておいてください。

py manage.py runserver

をしてサーバーを動かしたのちにhttp://127.0.0.1:8000/chat/lobby/ に2つのブラウザ(ウィンドウ)でアクセスします。

するとなんということでしょう!相手側にも反映されています!!!!

すごい!!!!!!!

はい。

まぁ使う場合は「JavaScript websocket」や「python websocket」、「Unity websocket」で調べてください。多分サーバーからsendされた場合はonReceiveみたいなイベント関数が用意されていると思います。以上。

参考

公式ドキュメント・チュートリアル
https://channels.readthedocs.io/en/stable/tutorial/part_2.html

stack overflow「channels without channel layer or any other free hosting」
https://stackoverflow.com/questions/53271407/channels-without-channel-layer-or-any-other-free-hosting

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python でJSONオブジェクトマッパーを作る

この記事は、Pythonその2 Advent Calendar 2020 23日目の記事です。

自己紹介

バックエンドのエンジニアとしてそろそろ8年が経ちます、星光輝と申します。
守備範囲はサーバーサイドですが、アプリ・インフラ・テスト・開発環境改善あたりの経験もある、広く浅い系エンジニアです。
最近は chalice というフレームワークを使い、およそ1年近く python に関わっています。

今回書くこと

API を作る時に、入力として受け取った JSON を意図したクラスにマッピングして欲しいなぁ...と思うことがあり、
python にそれっぽいライブラリがなかったので、自作したお話です。
python のバージョンは 3.8 です (from typing import get_origin get_args /できるのが3.8以降)。

なぜ作るのか?

汎用ライブラリの json の json.loads でも簡易的にはできるようなんですが、
そのために設定を頑張って書かないといけなさそうだし、汎用的に書くのは難しそうな感じがしました。

Java でも 古くから Jackson という有名なライブラリがあるのだから、探せばあるだろうと思いましたが、それらしい記載は出てきません。
そこで、いっそのこと作ってしまおうということで作りました。

作っていく

全体的な設計方針

クラスのインスタンスを何らかのデータ形式に落とすことをシリアライズと言い、
逆に何らかのデータ形式をクラスのインスタンスに変換することをデシリアライズと言います。

ここでは、下記のような使い方をするとしましょう。

@dataclass
class Hello:
    hello: str

objectMapper = ObjectMapper()
instance = objectMapper.deserialize('{"hello": "mapper"}', Hello)
print(instance.hello) ## 出力: mapper

今回は、下記のクラス構造で作ります。
スクリーンショット 2020-12-23 1.28.23.png

まずはインタフェースを書く

import json
from typing import Type, TypeVar, List
from abc import ABCMeta, abstractmethod

class NotImplementedError(Exception):
    def __init__(self, message):
        super().__init__(message)    

class JsonDeserializer(metaclass=ABCMeta):
    @abstractmethod
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        pass

    @abstractmethod
    def deserialize(self, json: object, mappingClass: type) -> object:
        pass

T = TypeVar('T')

class ObjectMapper:
    deserializers: List[JsonDeserializer] = []

    def deserialize(self, jsonText: str, mappingClass: Type[T]) -> T:
        jsonData = json.loads(jsonText)
        for deserializer in ObjectMapper.deserializers:
            if deserializer.canDeserialize(jsonData, mappingClass):
                return deserializer.deserialize(jsonData, mappingClass)
        raise NotImplementedError(f'Cannot deserialize json({jsonData}) to class({mappingClass}).')

このようにして、ObjectMapper 内に JsonDeserializer のオブジェクトをリスト形式で持ち、
対応できる JsonDeserializer が見つかったら(canDeserialize(json, mappingClass) == true)、
それを使って様々なクラスに対して利用できるようにします。

単純ケースの攻略

まずは、リテラルに対する単体テストを書きます。

from main.json_deserializer import ObjectMapper

class TestObjectMapper:
    def test_deserializeInt(self):
        actual = ObjectMapper().deserialize('1', int)
        assert actual == 1

    def test_deserializeStr(self):
        actual = ObjectMapper().deserialize('"1"', str)
        assert actual == '1'

    def test_deserializeFloat(self):
        actual = ObjectMapper().deserialize('1.5', float)
        assert actual == 1.5

    def test_deserializeNull(self):
        actual = ObjectMapper().deserialize('null', str)
        assert actual is None
$ python -m pytest
================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeInt - main.json_deserializer.NotImplementedError: Cannot deserialize json(1) to c...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeStr - main.json_deserializer.NotImplementedError: Cannot deserialize json(1) to c...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeFloat - main.json_deserializer.NotImplementedError: Cannot deserialize json(1.5) ...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeNull - main.json_deserializer.NotImplementedError: Cannot deserialize json(Null) ...
=================================================================== 5 failed in 0.21s ====================================================================```

まだ実装していないので当然こけますね。リテラル用の JsonDeserializer を実装します。

class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
        LiteralDeserializer()   ## 追加
    ]

class LiteralDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return type(json) in [int, float, str, type(None)]

    def deserialize(self, json: object, mappingClass: type) -> object:
        return json
collected 4 items                                                                                                                                        

tests/test_object_mapper.py ....

=================================================================== 4 passed in 0.03s ====================================================================

リテラルなんだけど、日時・Enum などの型にしたい場合

今度は下記のようなテストを加えます。

from datetime import datetime, date
from enum import Enum

class Member(Enum):
    JOHN = 'john'
    BOB = 'bob'

----(省略)----

    def test_deserializeDate(self):
        actual = ObjectMapper().deserialize('"2020-12-23"', date)
        assert actual == date(2020, 12, 23)

    def test_deserializeNaiveDateTime(self):
        actual = ObjectMapper().deserialize('"2020-12-23T03:00:00"', datetime)
        assert actual == datetime(2020, 12, 23, 3, 0, 0)

    def test_deserializeAwareDateTime(self):
        actual = ObjectMapper().deserialize('"2020-12-23T03:00:00+0900"', datetime)
        assert actual == datetime(2020, 12, 23, 3, 0, 0, tzinfo=timezone(timedelta(hours=+9), 'JST'))

    def test_deserializeEnum(self):
        actual = ObjectMapper().deserialize('"john"', Member)
        assert actual is Member.JOHN

日時には、Naive(タイムゾーンなし), Aware(タイムゾーンあり)があります。
異なる日時同士では計算できなかったりして、紛らわしいことがあるので全てを Aware にする方がいいこともありますが、
今回はどっちでも対応するデータを作成できるようにします。

試しにテスト実行

================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeDate - AssertionError: assert '2020-12-23' == datetime.date(2020, 12, 23)
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeNativeDateTime - AssertionError: assert '2020-12-23T03:00:00' == datetime.datetim...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeAwareDateTime - AssertionError: assert '2020-12-23T03:00:00+0900' == datetime.dat...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeEnum - AssertionError: assert 'john' is <Member.JOHN: 'john'>
============================================================== 4 failed, 4 passed in 0.12s ===============================================================

...ってことで、実装。

class DateDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return mappingClass == date

    def deserialize(self, json: object, mappingClass: type) -> object:
        dt = datetime.strptime(json, '%Y-%m-%d')
        return date(dt.year, dt.month, dt.day)

class DatetimeDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return mappingClass == datetime

    def deserialize(self, json: object, mappingClass: type) -> object:
        try:
            return datetime.strptime(json, '%Y-%m-%dT%H:%M:%S%z') # かなり雑..
        except ValueError:
            return datetime.strptime(json, '%Y-%m-%dT%H:%M:%S')

class EnumDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return issubclass(mappingClass, Enum)

    def deserialize(self, json: object, mappingClass: type) -> object:
        for enum in mappingClass:
            if enum.value == json:
                return enum

class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
        DatetimeDeserializer(), ## 追加
        EnumDeserializer(), ## 追加
        DateDeserializer(), ## 追加
        LiteralDeserializer()
    ]
collected 8 items                                                                                                                                        

tests/test_object_mapper.py ........

=================================================================== 8 passed in 0.07s ====================================================================

コンテナクラスを扱う

さて次からはコンテナクラスを扱います。今回は、下記のコンテナクラスを作ります。

  • List
  • Map
  • Object

まずは、コンテナ共通クラスを作ります。このようなことをするのは、
コンテナクラスは内部の別構造に対して、再度デシリアライズ依頼をしなければならないからです。
(少しトリッキーですが、内部構造に対して再度 ObjectMapper._deserialize を呼んでいます)

class ContainerDeserializer(JsonDeserializer):
    def deserializeChild(self, json: object, mappingClass: type) -> object:
        return ObjectMapper._deserialize(json, mappingClass)


class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
       ...JsonDeserializer
    ]
    def deserialize(self, jsonText: str, mappingClass: Type[T]) -> T:
        return self._deserialize(json.loads(jsonText), mappingClass)

    @staticmethod
    def _deserialize(jsonData: object, mappingClass: Type[T]) -> T:
        for deserializer in ObjectMapper.deserializers:
            if deserializer.canDeserialize(jsonData, mappingClass):
                return deserializer.deserialize(jsonData, mappingClass)
        raise NotImplementedError(f'Cannot deserialize json({jsonData}) to class({mappingClass}).')

そして、まずは構造を変えたので既存構造が壊れていないことを確認します。

collected 8 items                                                                                                                                        

tests/test_object_mapper.py ........                                                                                                               [100%]

=================================================================== 8 passed in 0.07s ====================================================================

大丈夫なようですので、再度テストを追加します。

@dataclass
class Person:
    name: str
    age: int

@dataclass
class Group:
    name: str
    leader: Person


## ---テスト追加----

    def test_deserializeRawList(self):
        actual = ObjectMapper().deserialize('[{"age": 35, "name": "鈴木"}, {"age": 21, "name": "山田"}]', list)
        assert len(actual) == 2
        assert actual[0].age == 35
        assert actual[0].name == '鈴木'
        assert actual[1].age == 21
        assert actual[1].name == '山田'

    def test_deserializeTypedList(self):
        actual = ObjectMapper().deserialize('[{"age": 35, "name": "鈴木"}, {"age": 21, "name": "山田"}]', List[Person])
        assert len(actual) == 2
        assert type(actual[0]) == Person
        assert actual[0].age == 35
        assert actual[0].name == '鈴木'
        assert type(actual[1]) == Person
        assert actual[1].age == 21
        assert actual[1].name == '山田'

    def test_deserializeRawDict(self):
        actual = ObjectMapper().deserialize('{"ID1":{"age": 35, "name": "鈴木"}, "ID3": {"age": 21, "name": "山田"}}', dict)
        assert len(actual) == 2
        assert actual['ID1'].age == 35
        assert actual['ID1'].name == '鈴木'
        assert actual['ID3'].age == 21
        assert actual['ID3'].name == '山田'

    def test_deserializeTypedDict(self):
        actual = ObjectMapper().deserialize(
            '{"ID1":{"age": 35, "name": "鈴木"}, "ID3": {"age": 21, "name": "山田"}}',
            Dict[str, Person]
        )
        assert len(actual) == 2
        assert type(actual['ID1']) == Person
        assert actual['ID1'].age == 35
        assert actual['ID1'].name == '鈴木'
        assert type(actual['ID3']) == Person
        assert actual['ID3'].age == 21
        assert actual['ID3'].name == '山田'

    def test_deserializeRawObject(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', object)
        assert actual.name == 'グループ'
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

    def test_deserializeTypedObject(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', Group)
        assert type(actual) == Group
        assert actual.name == 'グループ'
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

試しにテスト実行...当然実装していn(ry

================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawList - main.json_deserializer.NotImplementedError: Cannot deserialize json([{'...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedList - main.json_deserializer.NotImplementedError: Cannot deserialize json([...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawDict - main.json_deserializer.NotImplementedError: Cannot deserialize json({'I...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedDict - main.json_deserializer.NotImplementedError: Cannot deserialize json({...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedObject - main.json_deserializer.NotImplementedError: Cannot deserialize json...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawObject - main.json_deserializer.NotImplementedError: Cannot deserialize json({...
============================================================== 6 failed, 8 passed in 0.36s ===============================================================

実装。

from inspect import signature, _ParameterKind
from typing import Type, TypeVar, List, Dict, Set, get_origin, get_args

class ListDeserializer(ContainerDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return get_origin(mappingClass) == list or mappingClass == list

    def deserialize(self, json: object, mappingClass: type) -> object:
        genericParams = get_args(mappingClass)
        hasGenericParams = genericParams is not None and len(genericParams) > 0
        param = genericParams[0] if hasGenericParams else object
        return [self.deserializeChild(el, param) for el in json]

class DictDeserializer(ContainerDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return get_origin(mappingClass) == dict or mappingClass == dict

    def deserialize(self, json: object, mappingClass: type) -> object:
        genericParams = get_args(mappingClass)
        hasKeyParam = genericParams is not None and len(genericParams) > 0
        hasValueParam = genericParams is not None and len(genericParams) > 1
        return {
            self.deserializeChild(k, genericParams[0] if hasKeyParam else object)
            : self.deserializeChild(v, genericParams[1] if hasValueParam else object)
            for k, v in json.items()
        }

class ObjectDeserializer(ContainerDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return type(json) == dict

    def deserialize(self, json: object, mappingClass: type) -> object:
        if mappingClass == object:
            return self.createRawObject(json)
        else:
            return self.createObject(json, mappingClass)

    def createRawObject(self, args: Dict[str, object]) -> object:
        className = ''.join(key.title() for key in args.keys()) + 'Obejct'
        newInstance = type(className, (object,), {})()
        for k, v in args.items():
            setattr(newInstance, k, self.deserializeChild(v, object))
        return newInstance

    def createObject(self, args: Dict[str, object], mappingClass: type) -> object:
        annotations = self.findAnnotations(mappingClass)
        requireArgs = self.findInitRequireArgs(mappingClass)

        result = object.__new__(mappingClass)
        initArgs = {}
        for k, v in args.items():
            val = self.deserializeChild(v, annotations.get(k, object))
            if k in requireArgs:
                initArgs[k] = val
            else:
                setattr(result, k, val)
        for req in requireArgs:
            if req not in initArgs:
                initArgs[req] = None
        result.__init__(**initArgs)
        return result

    @staticmethod
    def findAnnotations(mappingClass: type) -> Dict[str, type]:
        if hasattr(mappingClass, '__annotations__'):
            return mappingClass.__annotations__
        for k, v in signature(mappingClass.__init__).parameters.items():
            return {k:v.annotation for k,v in signature(mappingClass.__init__).parameters.items()}

    @staticmethod
    def findInitRequireArgs(mappingClass: type) -> Set[str]:
        return {
            k for k, v in signature(mappingClass.__init__).parameters.items()
            if v.name != 'self' and v.kind == _ParameterKind.POSITIONAL_OR_KEYWORD
        }


class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
        DateDeserializer(),
        DatetimeDeserializer(),
        EnumDeserializer(),
        LiteralDeserializer(),
        ListDeserializer(), ## 追加
        DictDeserializer(), ## 追加
        ObjectDeserializer() ## 追加
    ]

テスト実行

collected 14 items                                                                                                                                       

tests/test_object_mapper.py ..............                                                                                                         [100%]

=================================================================== 14 passed in 0.18s ===================================================================

Object デシリアライザの挙動説明

Objectをマッピングするものになると、急に難易度が高くなりましたね。..なので、軽く説明します。

また、オブジェクトに内包されているデータをデシリアライズするには、
データのクラスを取得しなければいけませんが、それが格納されている可能性があるのは下記の二箇所です。

  • classに型定義がある場合 (下記コードの Group, GroupWithoutDataclass)
  • init の引数で型を定義している場合 (下記コードの GroupInitDef)
@dataclass
class Person:
    name: str
    age: int

# パターン1
@dataclass
class Group:
    name: str
    leader: Person

# パターン2
class GroupWithoutDataclass:
    name: str
    leader: Person

# パターン3
class GroupInitDef:
    def __init__(self, name: str, leader: Person):
        self.name = name
        self.leader = leader

また、python では Person('AA', 23) と実行すると Person クラスの __init__ メソッドが実行されます。
このメソッドでは、可変長引数(*args, **kwargs等)で定義されている変数以外は指定しないとエラーになります。
そのため、def findInitRequireArgs(mappingClass: type) -> Set[str] で定義すべき変数名を取得しています。

そして、今回はJSONデータとマッピングデータが下記のような関係だった場合に、下記の実装にしています。

  • JSONデータ が マッピングに必要なデータを持っていない → None で埋める
  • JSONデータ が マッピングに不必要なデータを持っている → setattr で属性追加している

今回は、このようにしていますが、場合によってはエラーにしたほうがいいケースもあるかもしれません。

念の為テストを追加。(Group は先ほどテストしているので省略)

    def test_deserializeTypedObjectWithoutDataclass(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', GroupWithoutDataclass)
        assert type(actual) == GroupWithoutDataclass
        assert actual.name == 'グループ'
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

    def test_deserializeTypedObjectInitDef(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', GroupInitDef)
        assert type(actual) == GroupInitDef
        assert actual.name == 'グループ'
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

    def test_deserializeTypedObjectEmptyJson(self):
        actual = ObjectMapper().deserialize('{}', Group)
        assert type(actual) == Group
        assert actual.name == None
        assert actual.leader == None

    def test_deserializeTypedObjectRedundantData(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "id": 2, "leader": {"age": 35, "name": "鈴木", "role": "課長"}}', Group)
        assert actual.name == 'グループ'
        assert actual.id == 2
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'
        assert actual.leader.role == '課長'
collected 18 items                                                                                                                                       

tests/test_object_mapper.py ..................                                                                                                     [100%]

=================================================================== 18 passed in 0.17s ===================================================================

もっと工夫をするなら...

実際はもうちょっと面倒な仕様があったりするので、プロジェクトに合わせて仕様を調整する必要があるかもしれません。

  • __init__メソッド に *args: T, **kwargs: T のような可変長引数
  • デフォルト値への対応
  • オブジェクト初期化時には init 実行しないようにして欲しいんだけど?
  • OrderedDict等の順序があるものにも対応できるのか?

ただ、今回はAPI等で受け取った JSON をクラスにマッピングすることが目的です。
JSONモデルに通常はそんなに複雑な要件は必要ないでしょう。

最後に

今回は、JSON からクラスのインスタンスを生成するオブジェクトマッパーを作成しました。
それほど大変な実装ではないですし、都度都度デシリアライズのロジックを追加していけばどんなものでも対応できるので、
メンテナンスはそれほど大変ではないと思います。

python は型定義がどんどん便利になっていますので、このような仕組みを押さえておくと今後何かいいことがあるかもしれません。

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む