20211007のPythonに関する記事は30件です。

言語処理100本ノック「第8章: ニューラルネット」

はじめに 言語処理100本ノック2020を解きました。 ニューラルネットの実装にはPyTorchを用います。ただし、単層ニューラルネットについては可能な限りクラスtorch.nn.Moduleは用いず、tensorの計算で実装します。 環境はWindows10、Python3.8です。 70. 単語ベクトルの和による特徴量 過去の章で作成した学習データ(train.csv)、検証データ(valid.csv)、評価データ(test.csv)をpandasのDataFrameとして読み込みます。 import pandas as pd # 分割したデータの読み込み path = './data/NewsAggregatorDataset/' train = pd.read_csv(path + 'train.csv', sep='\t') valid = pd.read_csv(path + 'vaild.csv', sep='\t') test = pd.read_csv(path + 'test.csv', sep='\t') # データの確認 print(train.head(3)) print('学習データ') print(train['CATEGORY'].value_counts()) print('検証データ') print(valid['CATEGORY'].value_counts()) print('評価データ') print(test['CATEGORY'].value_counts()) output ID TITLE \ 0 374424 UPDATE 2-US airlines signal solid demand ahead... 1 314859 GLOBAL MARKETS-Subdued Ifo takes M&A shine off... 2 236379 Hugh Jackman didn't warn daughter about nude s... URL PUBLISHER \ 0 http://in.reuters.com/article/2014/07/09/usa-a... Reuters 1 http://in.reuters.com/article/2014/06/24/marke... Reuters 2 http://www.contactmusic.com/story/hugh-jackman... Contactmusic.com CATEGORY STORY HOSTNAME TIMESTAMP 0 b dOd6Dz1t7cfuQEMif1KJCYJLKHJ8M in.reuters.com 1404966295491 1 b d02UFXK26SEszSM8Q1X2nUHj2pcOM in.reuters.com 1403700942137 2 e dOjehg3YSV_e60Mp3ra-L5UQ_-AqM www.contactmusic.com 1400764595882 学習データ b 4502 e 4223 t 1219 m 728 Name: CATEGORY, dtype: int64 検証データ b 562 e 528 t 153 m 91 Name: CATEGORY, dtype: int64 評価データ b 563 e 528 t 152 m 91 Name: CATEGORY, dtype: int64 特徴量行列 Word2Vecモデルによる単語の特徴量ベクトルへの変換にはgensimを用います。単語の前処理では「記号の消去」「小文字化」「数字列を0に置換」の操作のみを行っています。 import string import re import torch from gensim.models import KeyedVectors model = KeyedVectors.load_word2vec_format('./data/GoogleNews-vectors-negative300.bin.gz', binary=True) # 前処理 def preprocessing(text): # string.punctuation(!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~)消去するためのテーブル table = str.maketrans('', '', string.punctuation) text = text.translate(table) # 記号を消去 # 小文字化 text = text.lower() # 数字列を0に置換 text = re.sub('[0-9]+', '0', text) return text # 文字列リストの各語の特徴量ベクトルを計算し、平均化してtensorとして返す def get_feature_vec(text): vec = [] # 文字列をスペースで分割する text = word_split(text) for i in range(len(text)): # modelに含まれる単語のみベクトル化 if text[i] in model.index_to_key: vec.append(model[text[i]]) return torch.tensor(sum(vec) / len(vec)) # 文字列をスペースで分割する def word_split(text): # すべての見出しの文字列をまとめたリスト words = [] for t in re.split(' +', text): if t != '': words.append(t) return words # 記事見出しを特徴量行列に train_fea_mat = [get_feature_vec(preprocessing(tr)) for tr in train['TITLE']] valid_fea_mat = [get_feature_vec(preprocessing(va)) for va in valid['TITLE']] test_fea_mat = [get_feature_vec(preprocessing(te)) for te in test['TITLE']] # 平均化した特徴量ベクトルをまとめたlistをtensorに変換 train_fea_mat = torch.stack(train_fea_mat) valid_fea_mat = torch.stack(valid_fea_mat) test_fea_mat = torch.stack(test_fea_mat) # 特徴量ベクトルを保存 path = './data/torch/' torch.save(train_fea_mat, path+'train_fea_mat.pt') torch.save(valid_fea_mat, path+'valid_fea_mat.pt') torch.save(test_fea_mat, path+'test_fea_mat.pt') ラベルベクトル 問題文の例の通りにカテゴリに対応した数値を割り当てます。 category_dict = {'b':0, 't':1, 'e':2, 'm':3} train_lab_vec = torch.tensor(train['CATEGORY'].map(lambda x: category_dict[x]).values) valid_lab_vec = torch.tensor(valid['CATEGORY'].map(lambda x: category_dict[x]).values) test_lab_vec = torch.tensor(test['CATEGORY'].map(lambda x: category_dict[x]).values) # ラベルベクトルを保存 path = './data/torch/' torch.save(train_lab_vec, path+'train_lab_vec.pt') torch.save(valid_lab_vec, path+'valid_lab_vec.pt') torch.save(test_lab_vec, path+'test_lab_vec.pt') 71. 単層ニューラルネットワークによる予測 クラスtorch.nn.Moduleを用いてニューラルネットの実装を行うことが一般的ですが、ここでは問題文の文脈に沿って、行列$W$をtensorで実装します。以降の節でも、この行列$W$をtensorの操作によって学習していきます。 # データの読み込み path = 'data/torch/train_fea_mat.pt' train_fea_mat = torch.load(path) print(train_fea_mat.shape) W = torch.randn(300, 4) softmax = torch.nn.Softmax(dim=1) print(softmax(torch.matmul(train_fea_mat[:1], W))) print(softmax(torch.matmul(train_fea_mat[:4], W))) output torch.Size([10672, 300]) tensor([[0.1038, 0.2496, 0.2370, 0.4096]]) tensor([[0.1038, 0.2496, 0.2370, 0.4096], [0.2429, 0.2660, 0.1713, 0.3198], [0.0426, 0.4262, 0.2380, 0.2932], [0.4396, 0.0468, 0.1468, 0.3669]]) 72. 損失と勾配の計算 入力$\boldsymbol{x}$と行列$W$の積$\boldsymbol{x}W$とラベルベクトルからクロスエントロピー損失を計算します。71で行ったsoftmaxの処理はPyTorchのクラスtorch.nn.CrossEntropyLossの処理に組み込まれています。また、このクラスでは、事例集合に対する損失は各事例の損失の平均をとるように処理されます。 # データの読み込み path = './data/torch/' train_fea_mat = torch.load(path+'train_fea_mat.pt') print(train_fea_mat.shape) train_lab_vec = torch.load(path+'train_lab_vec.pt') print(train_lab_vec.shape) print('---') # 事例x1 print('事例x1') W = torch.randn(300, 4, requires_grad = True) # 損失 loss_fn = torch.nn.CrossEntropyLoss() print('損失') loss = loss_fn(torch.matmul(train_fea_mat[:1], W), train_lab_vec[:1]) print(loss) # 勾配 print('勾配') loss.backward() print(W.grad) print('---') # 事例集合 print('事例集合') W = torch.randn(300, 4, requires_grad = True) loss = loss_fn(torch.matmul(train_fea_mat[:4], W), train_lab_vec[:4]) print(loss) # 勾配 print('勾配') loss.backward() print(W.grad) output torch.Size([10672, 300]) torch.Size([10672]) --- 事例x1 損失 tensor(0.5833, grad_fn=<NllLossBackward>) 勾配 tensor([[ 1.0242e-02, -1.3424e-04, -7.7465e-03, -2.3613e-03], [-1.1733e-02, 1.5378e-04, 8.8744e-03, 2.7051e-03], [-7.4652e-04, 9.7840e-06, 5.6462e-04, 1.7211e-04], ..., [ 3.9496e-03, -5.1765e-05, -2.9873e-03, -9.1059e-04], [-6.9436e-02, 9.1005e-04, 5.2518e-02, 1.6009e-02], [ 7.9347e-02, -1.0399e-03, -6.0014e-02, -1.8293e-02]]) --- 事例集合 tensor(1.8652, grad_fn=<NllLossBackward>) 勾配 tensor([[ 0.0095, 0.0002, -0.0119, 0.0022], [-0.0082, 0.0003, 0.0055, 0.0024], [-0.0006, -0.0005, 0.0056, -0.0045], ..., [-0.0048, 0.0001, 0.0020, 0.0026], [-0.0519, 0.0026, 0.0352, 0.0142], [ 0.0419, -0.0011, -0.0415, 0.0008]]) 73. 確率的勾配降下法による学習 学習データセットはtorch.utils.dataのTensorDataset, DataLoaderを用いてミニバッチ化します。ただし、ここでは確率的勾配降下法(SGD)による学習を行うので、batch_size=1とし、データを1つ入力するたびに重みの更新を行います。 %%time from torch.utils.data import TensorDataset, DataLoader # データセット ds = TensorDataset(train_fea_mat, train_lab_vec) # DataLoaderを作成 loader = DataLoader(ds, batch_size=1, shuffle=True) epochs = 100 learning_rate = 0.01 # 行列W W = torch.randn(300, 4, requires_grad=True) # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() for epoch in range(1, epochs + 1): for inputs, targets in loader: # 損失 loss = loss_fn(torch.matmul(inputs, W), targets) # 勾配 loss.backward() # 更新 W = (W - W.grad * learning_rate).detach().requires_grad_() if epoch % 10 == 0: print(f"Epoch {epoch: 3d}: loss={loss: .4f}") output Epoch 10: loss= 1.2607 Epoch 20: loss= 0.0220 Epoch 30: loss= 0.0898 Epoch 40: loss= 0.0420 Epoch 50: loss= 0.1452 Epoch 60: loss= 0.0023 Epoch 70: loss= 0.5905 Epoch 80: loss= 0.0040 Epoch 90: loss= 0.0043 Epoch 100: loss= 0.0017 Wall time: 2min 49s 74. 正解率の計測 # 入力されたデータから予測分類ラベルを返す def pred(inputs, W): softmax = torch.nn.Softmax(dim=1) # 分類の確率 class_probs = softmax(torch.matmul(inputs, W)) # 確率最大のインデックスを取得 class_idx = torch.argmax(class_probs, dim=1) return class_idx # 予測ラベルと正解ラベルから正解率を返す def accuracy(pred, targets): correct = 0 for p, t in zip(pred, targets): if p == t: correct += 1 return correct / len(pred) # 訓練データの正解率 with torch.no_grad(): train_pred = pred(train_fea_mat, W) train_accuracy = accuracy(train_pred, train_lab_vec) print(f'train accuracy:{train_accuracy: .3f}') # 評価データ # データの読み込み path = './data/torch/' test_fea_mat = torch.load(path+'test_fea_mat.pt') test_lab_vec = torch.load(path+'test_lab_vec.pt') # 評価データの正解率 with torch.no_grad(): test_pred = pred(test_fea_mat, W) test_accuracy = accuracy(test_pred, test_lab_vec) print(f'test accuracy:{train_accuracy: .3f}') output train accuracy: 0.903 test accuracy: 0.886 75. 損失と正解率のプロット 各エポックが終了した時点での損失、正解率を記録し、プロットします。 learning %%time # データの読み込み path = 'data/torch/' valid_fea_mat = torch.load(path+'valid_fea_mat.pt') valid_lab_vec = torch.load(path+'valid_lab_vec.pt') # データセット ds = TensorDataset(train_fea_mat, train_lab_vec) # DataLoaderを作成 loader = DataLoader(ds, batch_size=1, shuffle=True) epochs = 100 learning_rate = 0.01 # 行列W W = torch.randn(300, 4, requires_grad=True) # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() # プロットするためloss, accuracyを記録 train_losses = [] valid_losses = [] train_accuracies = [] valid_accuracies = [] for epoch in range(1, epochs + 1): for inputs, targets in loader: # 損失 loss = loss_fn(torch.matmul(inputs, W), targets) # 勾配 loss.backward() # 更新 W = (W - W.grad * learning_rate).detach().requires_grad_() # loss, accuracy with torch.no_grad(): # 訓練 train_pred = pred(train_fea_mat, W) train_accuracy = accuracy(train_pred, train_lab_vec) train_loss = loss_fn(torch.matmul(train_fea_mat, W), train_lab_vec).item() # 検証 valid_pred = pred(valid_fea_mat, W) valid_accuracy = accuracy(valid_pred, valid_lab_vec) valid_loss = loss_fn(torch.matmul(valid_fea_mat, W), valid_lab_vec).item() # loss, accuracyを記録する train_losses.append(train_loss) train_accuracies.append(train_accuracy) valid_losses.append(valid_loss) valid_accuracies.append(valid_accuracy) if epoch % 10 == 0: print(f"Epoch {epoch: 3d}:") print(f"train [loss: {train_loss: .4f}] [accuracy: {train_accuracy: .3f}]") print(f"valid [loss: {valid_loss: .4f}] [accuracy: {valid_accuracy: .3f}]") output Epoch 10: train [loss: 0.4004] [accuracy: 0.864] valid [loss: 0.4301] [accuracy: 0.860] Epoch 20: train [loss: 0.3526] [accuracy: 0.880] valid [loss: 0.3975] [accuracy: 0.871] Epoch 30: train [loss: 0.3296] [accuracy: 0.888] valid [loss: 0.3837] [accuracy: 0.875] Epoch 40: train [loss: 0.3163] [accuracy: 0.893] valid [loss: 0.3768] [accuracy: 0.882] Epoch 50: train [loss: 0.3075] [accuracy: 0.896] valid [loss: 0.3733] [accuracy: 0.883] Epoch 60: train [loss: 0.3008] [accuracy: 0.899] valid [loss: 0.3704] [accuracy: 0.885] Epoch 70: train [loss: 0.2958] [accuracy: 0.901] valid [loss: 0.3690] [accuracy: 0.885] Epoch 80: train [loss: 0.2920] [accuracy: 0.901] valid [loss: 0.3688] [accuracy: 0.884] Epoch 90: train [loss: 0.2888] [accuracy: 0.902] valid [loss: 0.3680] [accuracy: 0.882] Epoch 100: train [loss: 0.2862] [accuracy: 0.903] valid [loss: 0.3679] [accuracy: 0.882] Wall time: 3min 5s plot import matplotlib.pyplot as plt plt.style.use('ggplot') fig, ax = plt.subplots(1, 2, figsize=(12, 4)) x = list(range(1, epochs+1, 1)) trains = [train_losses, train_accuracies] valids = [valid_losses, valid_accuracies] titles = ['loss', 'accuracy'] for a, y1, y2, t in zip(ax, trains, valids, titles): a.plot(x, y1, label='train') a.plot(x, y2, label='valid') a.set_title(t) a.legend() plt.show() 76. チェックポイント 「最適化アルゴリズムの内部状態」を記録するとありますが、上で行った学習においては最適化アルゴリズムの内部状態が変化しません。SGDをクラスtorch.optim.SGDで実装した場合、メソッドstate_dict()を用いることで最適化アルゴリズムの内部状態を得ることができるので、ここでは機能の確認の意味も込めてクラスtorch.optim.SGDを使ってSGDを実装します。 チェックポイントとして「エポック」「重み行列」「最適化アルゴリズムの内部状態」「損失」を記録します。 %%time import torch.optim as optim import os # データセット ds = TensorDataset(train_fea_mat, train_lab_vec) # DataLoaderを作成 loader = DataLoader(ds, batch_size=1, shuffle=True) epochs = 100 learning_rate = 0.01 # 行列W W = torch.randn(300, 4, requires_grad=True) # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() # 最適化 optimizer = optim.SGD([W], lr=learning_rate) # checkpoint用path path = '.\\data\\torch\\SGD\\' # checkpoint用フォルダ os.makedirs(path, exist_ok=True) for epoch in range(1, epochs + 1): for inputs, targets in loader: # 損失 loss = loss_fn(torch.matmul(inputs, W), targets) # 勾配 optimizer.zero_grad() loss.backward() # 更新 optimizer.step() # loss, accuracy with torch.no_grad(): # 訓練 train_pred = pred(train_fea_mat, W) train_accuracy = accuracy(train_pred, train_lab_vec) train_loss = loss_fn(torch.matmul(train_fea_mat, W), train_lab_vec).item() # 検証 valid_pred = pred(valid_fea_mat, W) valid_accuracy = accuracy(valid_pred, valid_lab_vec) valid_loss = loss_fn(torch.matmul(valid_fea_mat, W), valid_lab_vec).item() if epoch % 10 == 0: print(f"Epoch {epoch: 3d}:") print(f"train [loss: {train_loss: .4f}] [accuracy: {train_accuracy: .3f}]") print(f"valid [loss: {valid_loss: .4f}] [accuracy: {valid_accuracy: .3f}]") # チェックポイント if epoch < 10: str_epoch = '00'+str(epoch) elif epoch < 100: str_epoch = '0'+str(epoch) else: str_epoch = str(epoch) torch.save({'epoch': epoch, 'W': W, 'optimizer_state_dict': optimizer.state_dict(), 'loss': loss,}, path+str_epoch+'.qt') チェックポイントの内容を確認します。ここではエポック1、100の内容を表示しています。 path = '.\\data\\torch\\SGD\\' for str_epoch in ['001', '100']: checkpoint = torch.load(path+str_epoch+'.qt') print(str_epoch+'.qt') print(checkpoint['W']) print(checkpoint['optimizer_state_dict'], '\n') output 001.qt tensor([[-1.1179e+00, -5.2224e-01, -8.5316e-01, 1.0265e-03], [-1.0001e+00, 1.4584e+00, -7.6860e-01, -5.5979e-01], [-6.5533e-01, 1.0893e+00, 1.8742e+00, -6.3610e-02], ..., [-1.3617e-01, -1.1282e+00, -3.3203e-01, 7.7469e-01], [ 1.0637e+00, 3.2511e-02, -2.5581e-01, -3.2261e-01], [-6.7052e-01, 2.2379e-01, -7.1881e-01, 7.4596e-01]], requires_grad=True) {'state': {0: {'momentum_buffer': None}}, 'param_groups': [{'lr': 0.01, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False, 'params': [0]}]} 100.qt tensor([[-1.7633, 1.1876, -1.9330, 0.0163], [-0.9714, 0.6202, 0.3532, -0.8718], [ 1.5965, 0.8069, -1.2901, 1.1309], ..., [ 0.0246, -1.1899, 0.1920, 0.1514], [-0.3103, -0.4224, 0.6214, 0.6293], [-1.7357, -1.2247, 0.3008, 2.2404]], requires_grad=True) {'state': {0: {'momentum_buffer': None}}, 'param_groups': [{'lr': 0.01, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False, 'params': [0]}]} ここではあまり恩恵を感じられませんが、より複雑なアーキテクチャーや最適化アルゴリズムを持つモデルでメソッドstate_dict()はより有用なものとなるでしょう! 77. ミニバッチ化 バッチサイズは1,2,4,8,...,128で実験しました。また、各バッチサイズでのエポック数は10としました。 import time # データセット ds = TensorDataset(train_fea_mat, train_lab_vec) epochs = 10 learning_rate = 0.01 # バッチサイズ batch_sizes = [2**i for i in range(8)] # 時間 times = [] for batch_size in batch_sizes: # DataLoaderを作成 loader = DataLoader(ds, batch_size=batch_size, shuffle=True) # 行列W W = torch.randn(300, 4, requires_grad=True) # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() # 最適化 optimizer = optim.SGD([W], lr=learning_rate) print('batch size: ', batch_size) start = time.time() for epoch in range(1, epochs + 1): for inputs, targets in loader: # 損失 loss = loss_fn(torch.matmul(inputs, W), targets) # 勾配 optimizer.zero_grad() loss.backward() # 更新 optimizer.step() # loss, accuracy with torch.no_grad(): # 訓練 train_pred = pred(train_fea_mat, W) train_accuracy = accuracy(train_pred, train_lab_vec) train_loss = loss_fn(torch.matmul(train_fea_mat, W), train_lab_vec).item() # 検証 valid_pred = pred(valid_fea_mat, W) valid_accuracy = accuracy(valid_pred, valid_lab_vec) valid_loss = loss_fn(torch.matmul(valid_fea_mat, W), valid_lab_vec).item() if epoch % 2 == 0: print(f"Epoch {epoch: 3d}:") print(f"train [loss: {train_loss: .4f}] [accuracy: {train_accuracy: .3f}]") print(f"valid [loss: {valid_loss: .4f}] [accuracy: {valid_accuracy: .3f}]") # 1epochあたりの時間を計算 t = (time.time()-start) / epochs times.append(t) print('time / ephocs: ', t, '\n') print(times) output ...(学習状況の表示は省略)... [2.1517689704895018, 1.108371639251709, 0.6498688220977783, 0.4161367654800415, 0.28735790252685545, 0.20714108943939208, 0.18788692951202393, 0.16640586853027345] 78. GPU上での学習 基本的にはtensor型のデータに対しGPUを有効にすることで、GPU上での学習が可能となるはずですが、この辺りについては理解が浅いので不必要にGPUを有効にしていたり、GPUを有効にすべき場所でしていなかったりするかもしれません。 # GPU device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') # データセット ds = TensorDataset(train_fea_mat.to(device), train_lab_vec.to(device)) epochs = 10 learning_rate = 0.01 # バッチサイズ batch_sizes = [2**i for i in range(8)] # 時間 times = [] for batch_size in batch_sizes: # DataLoaderを作成 loader = DataLoader(ds, batch_size=batch_size, shuffle=True) # 行列W W = torch.randn(300, 4, requires_grad=True).to(device) # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() # 最適化 optimizer = optim.SGD([W], lr=learning_rate) print('batch size: ', batch_size) start = time.time() for epoch in range(1, epochs + 1): for inputs, targets in loader: # 損失 loss = loss_fn(torch.matmul(inputs, W), targets).to(device) # 勾配 optimizer.zero_grad() loss.backward() # 更新 optimizer.step() # loss, accuracy with torch.no_grad(): # 訓練 train_pred = pred(train_fea_mat.to(device), W) train_accuracy = accuracy(train_pred, train_lab_vec) train_loss = loss_fn(torch.matmul(train_fea_mat.to(device), W), train_lab_vec.to(device)).item() # 検証 valid_pred = pred(valid_fea_mat.to(device), W) valid_accuracy = accuracy(valid_pred, valid_lab_vec) valid_loss = loss_fn(torch.matmul(valid_fea_mat.to(device), W), valid_lab_vec.to(device)).item() if epoch % 2 == 0: print(f"Epoch {epoch: 3d}:") print(f"train [loss: {train_loss: .4f}] [accuracy: {train_accuracy: .3f}]") print(f"valid [loss: {valid_loss: .4f}] [accuracy: {valid_accuracy: .3f}]") # 1epochあたりの時間を計算 t = (time.time()-start) / epochs times.append(t) print('time / ephocs: ', t, '\n') print(times) output ...(学習状況の表示は省略)... [2.2572943925857545, 1.1734703302383422, 0.6524808883666993, 0.43701658248901365, 0.30716125965118407, 0.251761794090271, 0.18550732135772705, 0.18426837921142578] 計算速度はGPUを使わない場合にくらべて改善はしていません。より複雑なアーキテクチャー、バッチ学習においてGPUでの計算は有効になるでしょう! 79. 多層ニューラルネットワーク バイアス項の導入 入力$\boldsymbol{x}$と行列$W$の積$\boldsymbol{x}W$を計算した結果にバイアスベクトル$\boldsymbol{b}$を加えるモデルを考えます。バイアスベクトル$\boldsymbol{b}$の実装も、tensorを用います。また、このバイアス項を加えたモデルの予測値を返す関数pred_with_bを新たに定義します。 %%time # 入力されたデータから予測分類ラベルを返す def pred_with_b(inputs, W, b): softmax = torch.nn.Softmax(dim=1) # 分類の確率 class_probs = softmax(torch.matmul(inputs, W) + b) # 確率最大のインデックスを取得 class_idx = torch.argmax(class_probs, dim=1) return class_idx # GPU device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') # データセット ds = TensorDataset(train_fea_mat.to(device), train_lab_vec.to(device)) # DataLoaderを作成 loader = DataLoader(ds, batch_size=1, shuffle=True) epochs = 100 learning_rate = 0.01 # 行列W W = torch.randn(300, 4, requires_grad=True).to(device) # バイアス項 b = torch.randn(1, 4, requires_grad=True).to(device) # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() # 最適化 optimizer = optim.SGD([W, b], lr=learning_rate) for epoch in range(1, epochs + 1): for inputs, targets in loader: # 損失 loss = loss_fn(torch.matmul(inputs, W)+b, targets).to(device) # 勾配 optimizer.zero_grad() loss.backward() # 更新 optimizer.step() # loss, accuracy with torch.no_grad(): # 訓練 train_pred = pred_with_b(train_fea_mat.to(device), W, b) train_accuracy = accuracy(train_pred, train_lab_vec) train_loss = loss_fn(torch.matmul(train_fea_mat.to(device), W)+b, train_lab_vec.to(device)).item() # 検証 valid_pred = pred_with_b(valid_fea_mat.to(device), W, b) valid_accuracy = accuracy(valid_pred, valid_lab_vec) valid_loss = loss_fn(torch.matmul(valid_fea_mat.to(device), W)+b, valid_lab_vec.to(device)).item() if epoch % 10 == 0: print(f"Epoch {epoch: 3d}:") print(f"train [loss: {train_loss: .4f}] [accuracy: {train_accuracy: .3f}]") print(f"valid [loss: {valid_loss: .4f}] [accuracy: {valid_accuracy: .3f}]") output Epoch 10: train [loss: 0.3958] [accuracy: 0.862] valid [loss: 0.4373] [accuracy: 0.850] Epoch 20: train [loss: 0.3491] [accuracy: 0.879] valid [loss: 0.4000] [accuracy: 0.869] Epoch 30: train [loss: 0.3285] [accuracy: 0.890] valid [loss: 0.3851] [accuracy: 0.879] Epoch 40: train [loss: 0.3148] [accuracy: 0.893] valid [loss: 0.3778] [accuracy: 0.882] Epoch 50: train [loss: 0.3063] [accuracy: 0.896] valid [loss: 0.3736] [accuracy: 0.883] Epoch 60: train [loss: 0.2997] [accuracy: 0.897] valid [loss: 0.3712] [accuracy: 0.885] Epoch 70: train [loss: 0.2956] [accuracy: 0.900] valid [loss: 0.3703] [accuracy: 0.885] Epoch 80: train [loss: 0.2911] [accuracy: 0.900] valid [loss: 0.3680] [accuracy: 0.888] Epoch 90: train [loss: 0.2877] [accuracy: 0.901] valid [loss: 0.3675] [accuracy: 0.887] Epoch 100: train [loss: 0.2850] [accuracy: 0.902] valid [loss: 0.3671] [accuracy: 0.888] Wall time: 4min 4s 多層ニューラルネットワーク ここではクラスtorch.nn.Moduleを用いて実装を行います。 ニューラルネットワークをクラスMLNetとして定義します。中間層は2層で、サイズは64とし、重みは平均0、標準偏差1で初期化します。 import torch.nn as nn class MLNet(nn.Module): def __init__(self, input_size, output_size): super(MLNet, self).__init__() self.fc1 = nn.Linear(input_size, 64) self.fc2 = nn.Linear(64, output_size) nn.init.normal_(self.fc1.weight, 0.0, 1.0) nn.init.normal_(self.fc2.weight, 0.0, 1.0) nn.init.normal_(self.fc1.bias, 0.0, 1.0) nn.init.normal_(self.fc2.bias, 0.0, 1.0) def forward(self, x): x = self.fc1(x) x = self.fc2(x) return x from torch.utils.data import TensorDataset, DataLoader # GPU device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') # データセット ds = TensorDataset(train_fea_mat.to(device), train_lab_vec.to(device)) # DataLoaderを作成 loader = DataLoader(ds, batch_size=1, shuffle=True) # hyper parameters input_size = 300 output_size = 4 epochs = 100 learning_rate = 0.01 # 損失関数 loss_fn = torch.nn.CrossEntropyLoss() # プロットするためloss, accuracyを記録 train_losses = [] valid_losses = [] train_accuracies = [] valid_accuracies = [] model = MLNet(input_size, output_size).to(device) optimizer = optim.SGD(model.parameters(), lr=learning_rate) for epoch in range(1, epochs + 1): for inputs, targets in loader: # forward outputs = model(inputs) # 損失 loss = loss_fn(outputs, targets).to(device) # 勾配 optimizer.zero_grad() loss.backward() # 更新 optimizer.step() # loss, accuracy with torch.no_grad(): train_pred = model(train_fea_mat.to(device)) train_accuracy = accuracy(torch.argmax(train_pred.to(device), dim=1), train_lab_vec) train_loss = loss_fn(train_pred.to(device), train_lab_vec.to(device)).item() valid_pred = model(valid_fea_mat.to(device)) valid_accuracy = accuracy(torch.argmax(valid_pred.to(device), dim=1), valid_lab_vec.to(device)) valid_loss = loss_fn(valid_pred.to(device), valid_lab_vec.to(device)).item() train_losses.append(train_loss) train_accuracies.append(train_accuracy) valid_losses.append(valid_loss) valid_accuracies.append(valid_accuracy) if epoch % 10 == 0: print(f"Epoch {epoch: 3d}:") print(f"train [loss: {train_loss: .4f}] [accuracy: {train_accuracy: .3f}]") print(f"valid [loss: {valid_loss: .4f}] [accuracy: {valid_accuracy: .3f}]") output Epoch 10: train [loss: 0.3966] [accuracy: 0.857] valid [loss: 0.4338] [accuracy: 0.847] Epoch 20: train [loss: 0.3485] [accuracy: 0.880] valid [loss: 0.3949] [accuracy: 0.869] Epoch 30: train [loss: 0.3263] [accuracy: 0.889] valid [loss: 0.3808] [accuracy: 0.879] Epoch 40: train [loss: 0.3140] [accuracy: 0.895] valid [loss: 0.3731] [accuracy: 0.882] Epoch 50: train [loss: 0.3057] [accuracy: 0.898] valid [loss: 0.3702] [accuracy: 0.882] Epoch 60: train [loss: 0.2989] [accuracy: 0.900] valid [loss: 0.3670] [accuracy: 0.885] Epoch 70: train [loss: 0.2940] [accuracy: 0.902] valid [loss: 0.3658] [accuracy: 0.885] Epoch 80: train [loss: 0.2907] [accuracy: 0.902] valid [loss: 0.3651] [accuracy: 0.885] Epoch 90: train [loss: 0.2884] [accuracy: 0.902] valid [loss: 0.3647] [accuracy: 0.887] Epoch 100: train [loss: 0.2852] [accuracy: 0.903] valid [loss: 0.3646] [accuracy: 0.882] 参考文献 【言語処理100本ノック 2020】第8章: ニューラルネット - Qiita PyTorch documentation
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

EasyOCRで日本人だけが読めないフォント「Electroharmonix」を認識させてみた

はじめに ちょっとOCRを試してみたいなと思っていたところ、EasyOCRというPythonパッケージを見つけた 環境構築や認識する言語の切り替えがお手軽に出来、非常に使いやすい 日本人だけが読めないフォントはどう認識するのか、ふと気になり試してみた ソースコードはこちら ※後から調べて分かったが『日本人「だけ」読めないフォント』が本当に日本人だけ読めないのか、ニューラルネットの力を借りて確認してみたという記事が既にある。本記事はこのEasyOCR版だと思って頂ければ... EasyOCR その名の通り、簡単にOCRが試せるパッケージ Github デモページ そもそもOCRとは? Optical Character Recognitionの略 ざっくり画像から文字を抽出して認識する技術と理解している Electroharmonix 日本人だけが読めないフォントと言われているフォント ちなみに、下記の画像はモレモコヤロカムヤ巾ロウエメElectroharmonixと書いてある 記号とかはあるけど、大文字だけで小文字はなさそう (出典: Electroharmonix) 検証方法 下記のフォント使って「Hello, Wolrd」と書かれた画像を生成する electroharmonix IPAex明朝 IPAexゴシック EasyOCRで以下の言語を指定して文字認識を実行する 英語 日本語 日本語&英語 EasyOCRの出力した文字列から考察を行う 実装 実装といってもテキスト画像生成がメイン 事前準備 electroharmonixを含むいくつかのフォントを比較用にインストールしておく 今回は比較用にIPAexフォントもダウンロードした 必要パッケージのインストール 画像生成にpillowを用いるがEasyOCRに含まれていたため、EasyOCRのインストールのみで事足りた パッケージ管理にはPoetryを利用している Pillowを用いた画像生成 主な処理は以下の通り フォントサイズ: 48, 文字色: 黒, 背景色: 白に設定 指定したフォントでテキストを書いた時の領域(幅、高さ)を算出 パディングが全体のテキスト領域の10%になるように画像サイズ、テキスト表示位置を設定 @dataclass class TextImageGenerator: font_filepath: str = "fonts/ipaexg.ttf" fontsize: int = 48 px_ratio: float = 0.1 py_ratio: float = 0.1 def generate( self, text: str, color: Color = (0, 0, 0), bg_color: Color = (255, 255, 255), ) -> Image: # fontを取得 font = self._get_font() # 生成画像のサイズ、テキストの左上位置を取得 canvas_size, text_tl = self._get_canvas_size(text, font) # 下地となるImageを作成 img = self._get_canvas(bg_color, canvas_size) # 文字を描画 draw = ImageDraw.Draw(img) draw.text(text_tl, text, fill=color, font=font) return img def _get_canvas_size(self, text: str, font: ImageFont.FreeTypeFont) -> Tuple[Tuple[int, int], Tuple[int, int]]: text_width, text_height = ImageDraw.Draw(Image.new("RGB", (200, 200))).textsize(text, font) width = int(text_width * (1 + 2 * self.px_ratio)) height = int(text_height * (1 + 2 * self.py_ratio)) text_top_left_x = int(width * self.px_ratio / (1 + 2 * self.px_ratio)) text_top_left_y = int(height * self.py_ratio / (1 + 2 * self.py_ratio)) return (width, height), (text_top_left_x, text_top_left_y) def _get_canvas(self, bg_color: Color, canvas_size: Tuple[int, int]) -> Image: return Image.new("RGB", canvas_size, bg_color) def _get_font(self) -> ImageFont.FreeTypeFont: return ImageFont.truetype(self.font_filepath, self.fontsize) EasyOCRを用いた各フォント、言語での文字認識 主な処理は以下の通り 生成画像に対してEasyOCRで文字認識を実行する 認識した領域ごとに分割されてしまうため、全ての領域の結果を抽出する 上記の処理をフォントごと、言語ごとに行う 結果 画像生成結果とOCRの認識結果は下記の通り 画像生成結果 electroharmonix IPAex明朝 IPAexゴシック OCR認識結果 フォント 言語 結果 electroharmonix 英語 bELLA; WARlDi electroharmonix 日本語 カモレレロ,山口ポレワ』 electroharmonix 英語&日本語 カモレレロ,山口ポレワ』 IPAex明朝 英語 HELLO, WORLDI' IPAex明朝 日本語 チ__0, 0♪_」 IPAex明朝 英語&日本語 HELLO,VORLD」 IPAexゴシック 英語 HELLO, WORLDI IPAexゴシック 日本語 んヒ__0一0二_! IPAexゴシック 英語&日本語 HELLOWORLD! 考察 electroharmonix 英語としても結構外れてる... 日本人らしい認識だなぁ、一点気になるはヤじゃなくてポって読むのね ロ(ろ)と口(くち)を分けてるところも日本人らしくて個人的にビックリした 英語入れても日本語と一緒 IPAex系 日本語として読めないのはまぁ分かる 英語でも惜しいけど100点ではない「!」と「I」の違いって確かに結構難しい まとめ 他の記事でもみられるように、そもそも英語圏の人でも読みやすいフォントではなさそう ただ、思ったよりも日本人寄りの認識結果だった
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

AutoMLを使って30分でSignate-Intermediateへ昇格

まえがき Signateではコンペティションの成績で7段階で称号が割り当てられていますが、1つ上げるだけでもかなりハードルが高いものだと思っていました。レベルが高いのはもちろんのこと、コンペが数多くあるわけではないので昇格するのに時間がかかるものだと決めつけていましたが、意外とそうではありませんでした。 称号付きコンペやSOTA Challengeなど、昇格する機会を設けてくれており、運営の方々には感謝です。 試しに称号付きコンペに参加をし、初手でAutoMLを使ってみた所、案外すんなり閾値を超えることができ、Intermediateに昇格しました。 30分かからずにできたのでせっかくと思い、記事を作成しました。 ↓SignateHPから引用(https://signate.jp/users/rankings/help) 参加コンペティション 2021年10月のみ開催されているコンペです。 コード ライブラリのインポート import pandas as pd !pip install pycaret from pycaret.classification import * データの読み込み train_data = pd.read_csv("train.csv") train_data test_data = pd.read_csv("test.csv") test_data 学習データの正解ラベルの割合 len(train_data[train_data["loan_status"]==1])/len(train_data) Pycaret model = setup(data=train_data, target="loan_status") compare_models() 個人的にLightGBMが好きなので、とりあえず使っています。 model = create_model("lightgbm") F1スコアが最適になるようにパラメーターをチューニング tuned_model = tune_model(model, optimize="F1") ※Accuracy Ver. evaluate_model(tuned_model) とりあえず、学習データの正解ラベルの割合を閾値として採用 test_pred = predict_model(tuned_model, probability_threshold= 0.2148, data=test_data) test_pred 提出 sample = pd.read_csv("submit.csv", header=None) submission = sample.drop(columns=[1]) submission[0] = test_pred["Label"] submission.to_csv("submission.csv", header=None) あとがき Intermediateに昇格したことによって自己アピールにつながればいいなと思っています。 次は、AdvancedやExpertに昇格できるようにSOTA Challengeに参加してメダル獲得できるように頑張りたいと思います。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ABC63 C - Bugged を解いた

ビット探索が頭をよぎったけど、2^100 って無理だな。 OK じゃあ,if で行こう。 Bugged.py N = int(input()) S0 = [] #10の倍数 S1 = [] #not 10 の倍数 for _ in range(N): s = int(input()) if s%10 == 0: S0.append(s) else: S1.append(s) S1.sort() Total = sum(S0)+sum(S1) if Total%10 != 0: #全部足した状態で not 10 の倍数なら答え print(Total) else: for i in range(len(S1)): #not10倍数だけのリスト S1 から小さい順に引いていく Total -=S1[i] if Total%10 != 0: #TotalSum%10 != 0 なら最大値 print(Total) break else: print(0) 気を付けたのは入力 S。 求めたいのは合計の最大値。 よって、引いていきたいのは not 10 の倍数を一個ずつ引いて試したい。 だから入力時に s1,s2... はデータを受け取る時点で分別した。 例えば、 10,15,25 って入力の場合、小さい順で引いていくと最大値は 25 になる。 でもそれって、15 だけ引けば、最大値は 35 になるよね?的なイメージ。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[Python] リトライ処理簡単に書くなら tenacity ライブラリを使おう!

はじめに 昨今、外部のAPIやクラウドサービスを使うにあたり、リトライ処理を導入したいケースが増えてきました。 APIやクラウドサービス側の負荷制限、予期せぬエラーで処理が止まってしまう場合に、リトライ処理を入れたいからです。 自前実装でリトライ処理を導入しても良いのですが、リトライ間隔を指数関数的に増やしたい場合など込み入った実装を入れる場合は難しくなります。 そこで、今回はリトライ処理が行えるライブラリについての記事です。 Pythonでリトライ処理を書く際に導入すべきライブラリは? Pythonでリトライ処理を簡単に導入するためのライブラリを検索すると、以下の3つが検索に上がってきます。 tenacity retry retrying 今回はtenacityについての記事です。 ちなみに、tenacityは「粘り強い」みたいな意味だそうです。 retryとretryingではダメなの? 簡単に理由を表で説明すると以下になります。 ライブラリ 最終更新 スター数 tenacity 記事を書いている前日 3300 retry 2016年5月 420 retrying 2016年7月 1700 ※表は本記事を書いている2021年10月5日時点での情報 日本語で検索するとretryとretryingを使っている技術記事が多く出てくるのですが、これらのライブラリは更新が随分昔で止まっています。 retry 最終更新: 2016年5月 retrying 最終更新: 2016年7月 一方で、tenacityは更新され続けており、現在この記事を書いている前日(2021年10月4日)にも更新が入っています。 githubのスター数を見ても、tenacityが圧倒的に多くなっています。 ※2021年10月時点 - retry スター数: 420 - retrying スター数: 1700 - tenacity スター数: 3300 これらの理由から、今後も使い続けられるtenacityをリトライのライブラリに選ぶのが良いでしょう。 またtenacity自体、更新の止まったretryingライブラリからフォークして作られています。 そういった意味でもtenacityは最新版のretryingライブラリと言えるでしょう。 tenacityの使い方 基本はgithubのREADMEを参考にするので問題ないですが、細かく使い方を見ていきましょう。 全て紹介するのはボリューム的に難しいので、よく使うものだけ紹介していきます。かなり細かい動作が指定できるので、細かいリトライの制御を必要とする場合はgithubのREADMEを参考にしてください。 ※使う際は、pip installをしてください。 pip install tenacity 基本:関数にリトライ処理を導入する 最も基本の使い方は、特定の関数にデコレータとして使用することです。 デコレータを指定すると、例外が発生した場合はリトライ処理が行われることになります。 サンプルコード sample.py import random from tenacity import retry @retry def retry_test(): if random.randint(0, 10) > 1: print('retry') raise Exception else: return "Success!" print(retry_test()) 実行結果 retry retry retry retry Success! リトライ回数を指定する リトライ回数を指定する際は、@retryデコレータの引数stopに値を渡す必要があります。 また、その際にtenacityモジュールからstop_after_attemptをインポートする必要があることも忘れないようにしましょう。 サンプルコード sample.py from tenacity import retry, stop_after_attempt # 新たにimportを行う必要がある @retry(stop=stop_after_attempt(3)) def stop_after_attempt(): print('retry') raise Exception('retry error') stop_after_attempt() 実行結果 3回実行されたあと、エラーで落ちる。 retry retry retry Exception: retry error 一定時間リトライし続ける リトライ期間を指定する際は、@retryデコレータの引数stopに値を渡す必要があります。 また、その際にtenacityモジュールからstop_after_delayをインポートする必要があることも忘れないようにしましょう。 ※ここで紹介しているのは一定時間リトライし続ける実装であり、一定時間ごとにリトライを行う実装ではないことに注意してください。 sample.py from tenacity import retry, stop_after_delay @retry(stop=stop_after_delay(10)) def stop_after_delay_test(): print('retry') raise Exception stop_after_delay_test() 実行結果 """ retry retry =======retry出力続く retry 10秒後に止まる """ リトライ回数制限 & 時間制限 回数制限と時間制限をどちらもかけることができます。 下記はどちらかの制限にかかった場合、リトライを停止するサンプルコードです。 サンプルコード sample.py import time from tenacity import retry, stop_after_attempt, stop_after_delay @retry(stop=(stop_after_attempt(3) | stop_after_delay(10))) def stop_after_attempt_delay_combine(sleep: int): time.sleep(sleep) print('retry') raise Exception('retry error') stop_after_attempt_delay_combine(1) # => 1秒ごとにリトライされるので試行回数の制限でリトライが終了する stop_after_attempt_delay_combine(5) # => 5秒ごとにリトライされるので時間制限でリトライが終了する 時間をおいてリトライ APIアクセスのリトライなど、短期間でアクセスしすぎるとサービス提供側から危険なIPと判断されて弾かれてしまう場合があります。そんなときは、一定時間ごとのリトライを利用しましょう。 秒数固定で一定時間ごとのリトライをさせたい場合、wait引数にwait_fixed(秒)をわたしてあげます。 サンプルコード sample.py @retry(wait=wait_fixed(2)) def wait_fixed_test(): print('retry') raise Exception wait_fixed_test() 実行結果 他には以下のような実装が可能です。 - 指定した範囲の秒数でリトライ処理を行うwait_random(min=秒, max=秒) - 指数関数的にリトライ待機時間を増やすwait_exponential(multiplier=倍率, min=秒, max=秒)⇒こちらは後で説明 - 特定回数までは3秒ごとにリトライ、特定回数からは5秒ごとにリトライなど、細かい設定 細かい実装を実現したいときは、githubのREADMEを見てください。 指数関数のリトライは個別に解説します。 指数関数的にリトライ待機時間を設定する 指数関数的にリトライ待機時間を増やしたい場合、wait_exponential(multiplier=1, min=3, max=50)を利用します。 multiplierで指定した数字は「[指定した数値]*2」を表す数値です。 分かりにくいと思うので実例を見ていきましょう。 multiplierに1を設定すると、以下の動きになります。 1回目のリトライ 1 = 1秒 2回目のリトライ 1*2 = 2秒 3回目のリトライ 2*2 = 4秒 4回目のリトライ 4*2 = 16秒 5回目のリトライ 16*2 = 32秒 multiplierに3を設定すると、以下の動きになります。 1回目のリトライ 3 = 3秒 2回目のリトライ 3*2 = 6秒 3回目のリトライ 6*2 = 12秒 4回目のリトライ 12*2 = 24秒 5回目のリトライ 24*2 = 48秒 minは最低でも待機する時間を設定します。なので、先ほどの1回目のリトライで3秒のとき、min=5で指定されていた場合、5秒の待機を行います。 maxはその逆で、先ほどの5回目のリトライで48秒のとき、max=40で指定されていた場合、40秒の待機を行います。 サンプルコード sample.py from time import time from tenacity import retry, wait_exponential t = time() @retry(wait=wait_exponential(multiplier=1, min=3, max=50)) def wait_exponential_sample(): c = time() print(int(c-t)) print('retry') raise Exception 実行結果 0 retry 3 retry 6 retry 10 retry 18 retry 特定のエラー発生時、リトライする 特定のエラーを指定することで、そのエラーの発生時のみリトライを行うことができます。 retry_if_exception_typeをインポートし、リトライさせたい例外を渡します。 特定のエラーを指定できるのは、AWSから返ってきた例外を指定したりAPIから返ってきたエラーを指定できるので、実用度が高いです。 @retry(retry=retry_if_exception_type(TypeError)) 複数のエラーをしていするには、タプルの形式でエラーを渡してあげましょう。 @retry(retry=retry_if_exception_type((TypeError, KeyError))) 全体のサンプルコードは以下です。 リトライするサンプルコード sample.py from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type((TypeError, IndexError))) def retry_if_exception_sample(): print('retry') raise TypeError # 指定した例外なのでリトライする retry_if_exception_sample() リトライしないサンプルコード sample.py from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type((TypeError, IndexError))) def retry_if_exception_sample(): print('retry') raise KeyError # 指定した例外ではないのでリトライしない retry_if_exception_sample() また、コード内でリトライさせたい際に、TryAgain例外を投げることも可能です。 分かりにくいと思うのでまたサンプルコードを見ていきましょう。 sample.py from tenacity import retry, TryAgain # TryAgainをインポートする @retry def retry_if_exception_sample(num): if num > 10: print('retry') raise TryAgain else: return 'Success' retry_if_exception_sample(11) TryAgainを使えばどこでも例外を投げられるので、コーディングの自由度が増しそうですね。 とはいえ、使いすぎると汚いコードになりそうです。 特定の返り値の時、リトライする 特定の返り値の時、リトライする処理を書くこともできます。 retry_if_resultをインポートし、返り値を判定する関数を渡してあげましょう。 この説明だけでは分かりにくいので、サンプルコードを見ていきます。 サンプルコード sample.py from tenacity import retry, retry_if_result def check_result(result): """返り値がFailedならTrueを返す""" return result == 'Failed' @retry(retry=retry_if_result(check_result)) # 関数をretry_if_resultに渡す def retry_if_result_sample(num): print('retry') if num < 10: return 'Failed' return 'Success' print(retry_if_result_sample(5)) 返り値によってリトライの判断を出来るのも、使い勝手が良いです。 チェック用の関数の中でDBから値を取ってきて比較のような動きも出来るので、状況に応じた動的なリトライ処理も可能になります。 リトライ前後にログを仕込む 以下のように指定することで、前後どちらか、前後両方にログを仕込むことができます。 @retry(before=before_log(logger, logging.DEBUG)) 実行結果と一緒にサンプルコードを見ていきましょう。 サンプルコード sample.py from tenacity import retry, wait_fixed, before_log, after_log import sys import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) @retry(wait=wait_fixed(2), before=before_log(logger, logging.DEBUG), after=after_log(logger, logging.DEBUG)) def wait_fixed_sample(): print('retry') raise Exception wait_fixed_sample() 実行結果 DEBUG:__main__:Starting call to '__main__.wait_fixed_sample', this is the 1st time calling it. retry DEBUG:__main__:Finished call to '__main__.wait_fixed_sample' after 0.000(s), this was the 1st time calling it. DEBUG:__main__:Starting call to '__main__.wait_fixed_sample', this is the 2nd time calling it. retry DEBUG:__main__:Finished call to '__main__.wait_fixed_sample' after 2.015(s), this was the 2nd time calling it. 実行回数や、リトライ初回からの経過秒数がログ出力されます。 リトライ周りで何か不具合が生じたときのために、ログ出力をしておくとよいでしょう。 その他の機能 このほかにも様々な便利機能があるのですが、基本的にgithubのREADMEに書かれています。 ここまで読まれた方ならREADMEの読み方も分かるはずなので、そのほかの機能は「こんな機能もあるよ!」という紹介までにとどめておこうと思います。 (これ以上書くとキリがないと思ってしまった...) リトライ前とリトライ後にログを仕込む(Logging) リトライの統計情報を表示する(Statistics) リトライの制限まで達したあと、特定の関数を実行し返り値を返す(Callback) 関数呼び出し時にリトライ対象として指定する 関数ではなく特定のコードブロックをリトライ対象とする サンプルコード 今回解説であげたサンプルコードはgithubで公開しておきます。 圧倒的に公式が分かりやすいので、公式のgithubリポジトリを見て分からない場合のみ、お使いください。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

[Python] tenacity ライブラリを利用してリトライ処理を簡単に実装する

Pythonでリトライ処理を書く際に導入すべきライブラリは? Pythonでリトライ処理を簡単に導入するためのライブラリを検索すると、以下の3つが検索に上がってきます。 tenacity retry retrying 今回はtenacityについての記事です。 ちなみに、tenacityは「粘り強い」みたいな意味だそうです。 retryとretryingではダメなの? 簡単に理由を表で説明すると以下になります。 ライブラリ 最終更新 スター数 tenacity 記事を書いている前日 3300 retry 2016年5月 420 retrying 2016年7月 1700 ※表は本記事を書いている2021年10月5日時点での情報 日本語で検索するとretryとretryingを使っている技術記事が多く出てくるのですが、これらのライブラリは更新が随分昔で止まっています。 retry 最終更新: 2016年5月 retrying 最終更新: 2016年7月 一方で、tenacityは更新され続けており、現在この記事を書いている前日(2021年10月4日)にも更新が入っています。 githubのスター数を見ても、tenacityが圧倒的に多くなっています。 ※2021年10月時点 - retry スター数: 420 - retrying スター数: 1700 - tenacity スター数: 3300 これらの理由から、今後も使い続けられるtenacityをリトライのライブラリに選ぶのが良いでしょう。 またtenacity自体、更新の止まったretryingライブラリからフォークして作られています。 そういった意味でもtenacityは最新版のretryingライブラリと言えるでしょう。 tenacityの使い方 基本はgithubのREADMEを参考にするので問題ないですが、細かく使い方を見ていきましょう。 全て紹介するのはボリューム的に難しいので、よく使うものだけ紹介していきます。かなり細かい動作が指定できるので、細かいリトライの制御を必要とする場合はgithubのREADMEを参考にしてください。 ※使う際は、pip installをしてください。 pip install tenacity 基本:関数にリトライ処理を導入する 最も基本の使い方は、特定の関数にデコレータとして使用することです。 デコレータを指定すると、例外が発生した場合はリトライ処理が行われることになります。 サンプルコード sample.py import random from tenacity import retry @retry def retry_test(): if random.randint(0, 10) > 1: print('retry') raise Exception else: return "Success!" print(retry_test()) 実行結果 retry retry retry retry Success! リトライ回数を指定する リトライ回数を指定する際は、@retryデコレータの引数stopに値を渡す必要があります。 また、その際にtenacityモジュールからstop_after_attemptをインポートする必要があることも忘れないようにしましょう。 サンプルコード sample.py from tenacity import retry, stop_after_attempt # 新たにimportを行う必要がある @retry(stop=stop_after_attempt(3)) def stop_after_attempt(): print('retry') raise Exception('retry error') stop_after_attempt() 実行結果 3回実行されたあと、エラーで落ちる。 retry retry retry Exception: retry error 一定時間リトライし続ける リトライ期間を指定する際は、@retryデコレータの引数stopに値を渡す必要があります。 また、その際にtenacityモジュールからstop_after_delayをインポートする必要があることも忘れないようにしましょう。 ※ここで紹介しているのは一定時間リトライし続ける実装であり、一定時間ごとにリトライを行う実装ではないことに注意してください。 sample.py from tenacity import retry, stop_after_delay @retry(stop=stop_after_delay(10)) def stop_after_delay_test(): print('retry') raise Exception stop_after_delay_test() 実行結果 """ retry retry =======retry出力続く retry 10秒後に止まる """ リトライ回数制限 & 時間制限 回数制限と時間制限をどちらもかけることができます。 下記はどちらかの制限にかかった場合、リトライを停止するサンプルコードです。 サンプルコード sample.py import time from tenacity import retry, stop_after_attempt, stop_after_delay @retry(stop=(stop_after_attempt(3) | stop_after_delay(10))) def stop_after_attempt_delay_combine(sleep: int): time.sleep(sleep) print('retry') raise Exception('retry error') stop_after_attempt_delay_combine(1) # => 1秒ごとにリトライされるので試行回数の制限でリトライが終了する stop_after_attempt_delay_combine(5) # => 5秒ごとにリトライされるので時間制限でリトライが終了する 時間をおいてリトライ APIアクセスのリトライなど、短期間でアクセスしすぎるとサービス提供側から危険なIPと判断されて弾かれてしまう場合があります。そんなときは、一定時間ごとのリトライを利用しましょう。 秒数固定で一定時間ごとのリトライをさせたい場合、wait引数にwait_fixed(秒)をわたしてあげます。 サンプルコード sample.py @retry(wait=wait_fixed(2)) def wait_fixed_test(): print('retry') raise Exception wait_fixed_test() 実行結果 他には以下のような実装が可能です。 - 指定した範囲の秒数でリトライ処理を行うwait_random(min=秒, max=秒) - 指数関数的にリトライ待機時間を増やすwait_exponential(multiplier=倍率, min=秒, max=秒)⇒こちらは後で説明 - 特定回数までは3秒ごとにリトライ、特定回数からは5秒ごとにリトライなど、細かい設定 細かい実装を実現したいときは、githubのREADMEを見てください。 指数関数のリトライは個別に解説します。 指数関数的にリトライ待機時間を設定する 指数関数的にリトライ待機時間を増やしたい場合、wait_exponential(multiplier=1, min=3, max=50)を利用します。 multiplierで指定した数字は「[指定した数値]*2」を表す数値です。 分かりにくいと思うので実例を見ていきましょう。 multiplierに1を設定すると、以下の動きになります。 1回目のリトライ 1 = 1秒 2回目のリトライ 1*2 = 2秒 3回目のリトライ 2*2 = 4秒 4回目のリトライ 4*2 = 16秒 5回目のリトライ 16*2 = 32秒 multiplierに3を設定すると、以下の動きになります。 1回目のリトライ 3 = 3秒 2回目のリトライ 3*2 = 6秒 3回目のリトライ 6*2 = 12秒 4回目のリトライ 12*2 = 24秒 5回目のリトライ 24*2 = 48秒 minは最低でも待機する時間を設定します。なので、先ほどの1回目のリトライで3秒のとき、min=5で指定されていた場合、5秒の待機を行います。 maxはその逆で、先ほどの5回目のリトライで48秒のとき、max=40で指定されていた場合、40秒の待機を行います。 サンプルコード sample.py from time import time from tenacity import retry, wait_exponential t = time() @retry(wait=wait_exponential(multiplier=1, min=3, max=50)) def wait_exponential_sample(): c = time() print(int(c-t)) print('retry') raise Exception 実行結果 0 retry 3 retry 6 retry 10 retry 18 retry 特定のエラー発生時、リトライする 特定のエラーを指定することで、そのエラーの発生時のみリトライを行うことができます。 retry_if_exception_typeをインポートし、リトライさせたい例外を渡します。 特定のエラーを指定できるのは、AWSから返ってきた例外を指定したりAPIから返ってきたエラーを指定できるので、実用度が高いです。 @retry(retry=retry_if_exception_type(TypeError)) 複数のエラーをしていするには、タプルの形式でエラーを渡してあげましょう。 @retry(retry=retry_if_exception_type((TypeError, KeyError))) 全体のサンプルコードは以下です。 リトライするサンプルコード sample.py from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type((TypeError, IndexError))) def retry_if_exception_sample(): print('retry') raise TypeError # 指定した例外なのでリトライする retry_if_exception_sample() リトライしないサンプルコード sample.py from tenacity import retry, retry_if_exception_type @retry(retry=retry_if_exception_type((TypeError, IndexError))) def retry_if_exception_sample(): print('retry') raise KeyError # 指定した例外ではないのでリトライしない retry_if_exception_sample() また、コード内でリトライさせたい際に、TryAgain例外を投げることも可能です。 分かりにくいと思うのでまたサンプルコードを見ていきましょう。 sample.py from tenacity import retry, TryAgain # TryAgainをインポートする @retry def retry_if_exception_sample(num): if num > 10: print('retry') raise TryAgain else: return 'Success' retry_if_exception_sample(11) TryAgainを使えばどこでも例外を投げられるので、コーディングの自由度が増しそうですね。 とはいえ、使いすぎると汚いコードになりそうです。 特定の返り値の時、リトライする 特定の返り値の時、リトライする処理を書くこともできます。 retry_if_resultをインポートし、返り値を判定する関数を渡してあげましょう。 この説明だけでは分かりにくいので、サンプルコードを見ていきます。 サンプルコード sample.py from tenacity import retry, retry_if_result def check_result(result): """返り値がFailedならTrueを返す""" return result == 'Failed' @retry(retry=retry_if_result(check_result)) # 関数をretry_if_resultに渡す def retry_if_result_sample(num): print('retry') if num < 10: return 'Failed' return 'Success' print(retry_if_result_sample(5)) 返り値によってリトライの判断を出来るのも、使い勝手が良いです。 チェック用の関数の中でDBから値を取ってきて比較のような動きも出来るので、状況に応じた動的なリトライ処理も可能になります。 リトライ前後にログを仕込む 以下のように指定することで、前後どちらか、前後両方にログを仕込むことができます。 @retry(before=before_log(logger, logging.DEBUG)) 実行結果と一緒にサンプルコードを見ていきましょう。 サンプルコード sample.py from tenacity import retry, wait_fixed, before_log, after_log import sys import logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) logger = logging.getLogger(__name__) @retry(wait=wait_fixed(2), before=before_log(logger, logging.DEBUG), after=after_log(logger, logging.DEBUG)) def wait_fixed_sample(): print('retry') raise Exception wait_fixed_sample() 実行結果 DEBUG:__main__:Starting call to '__main__.wait_fixed_sample', this is the 1st time calling it. retry DEBUG:__main__:Finished call to '__main__.wait_fixed_sample' after 0.000(s), this was the 1st time calling it. DEBUG:__main__:Starting call to '__main__.wait_fixed_sample', this is the 2nd time calling it. retry DEBUG:__main__:Finished call to '__main__.wait_fixed_sample' after 2.015(s), this was the 2nd time calling it. 実行回数や、リトライ初回からの経過秒数がログ出力されます。 リトライ周りで何か不具合が生じたときのために、ログ出力をしておくとよいでしょう。 その他の機能 このほかにも様々な便利機能があるのですが、基本的にgithubのREADMEに書かれています。 ここまで読まれた方ならREADMEの読み方も分かるはずなので、そのほかの機能は「こんな機能もあるよ!」という紹介までにとどめておこうと思います。 (これ以上書くとキリがないと思ってしまった...) リトライ前とリトライ後にログを仕込む(Logging) リトライの統計情報を表示する(Statistics) リトライの制限まで達したあと、特定の関数を実行し返り値を返す(Callback) 関数呼び出し時にリトライ対象として指定する 関数ではなく特定のコードブロックをリトライ対象とする サンプルコード 今回解説であげたサンプルコードはgithubで公開しておきます。 圧倒的に公式が分かりやすいので、公式のgithubリポジトリを見て分からない場合のみ、お使いください。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

gBizINFOから法人データを取得

gBizINFOの概要 gBizINFO 法人として登記されている約400万社を対象とし、法人番号、法人名、本社所在地に加えて、府省との契約情報、表彰情報等の政府が保有し公開している法人活動情報を本サイトで一括検索、閲覧できます。法人の中には、行政機関や管理組合等、法人番号が付与されている組織すべてが含まれています。 目次 データの種類 REST APIの提供 法人データの取得 3-1. 法人番号の取得 3-2. REST APIからのデータ取得 3-3. Pythonスクリプト コメント 参照 1. データの種類 政府が保有している届出認定情報、表彰情報、補助金情報、調達情報、特許情報、財務情報、職場情報。 2. REST APIの提供 API利用方法 gBizINFOが保有する法人情報に対して、情報提供REST APIにより、指定する条件に合致する法人の法人番号を検索します。また、情報提供REST APIにより、指定する法人番号の法人情報(法人基本情報、補助金情報、届出・認定情報、調達情報、特許情報、財務情報)を提供します。 3. 法人データの取得 3-1. 法人番号の取得 国税庁の法人番号公表サイトから、公表されている全ての法人の全国分の全件データをダウンロードする。 国税庁法人番号公表サイト・全件データのダウンロード 3-2. REST APIからのデータ取得 情報提供REST APIにより、法人番号の全件データに対応する法人情報(法人基本情報、補助金情報、届出・認定情報、調達情報、特許情報、財務情報)を取得する。 3-3. Pythonスクリプト 法人番号の全件リストを読み込み、gBizINFOのREST APIから法人番号毎に対応する法人データを取得する。 基本的な構文 # coding: utf-8 import os import requests import json import datetime import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from tqdm.autonotebook import tqdm # API token token = 'XXXXXXX' # corporate numbers list corp_nums = pd.read_csv("00_zenkoku_all_20210831.csv", header=None, sep=',', encoding = "shift-jis", low_memory=False) corp_nums = corp_nums.iloc[0:,1].fillna(-1).astype(np.int64) print(corp_nums.head()) class gbizinfo: def __init__(self, token): self.headers = { "Accept": "application/json", "X-hojinInfo-api-token": token } self.endpoint_url = 'https://info.gbiz.go.jp/hojin/v1/hojin/' #届出・認定情報 def _get_corporate_certification(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/certification', headers = self.headers, timeout = 10 ) res = r.json()["hojin-infos"][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'certification', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment'], errors='ignore') cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'category', 'date_of_approval', 'enterprise_scale', 'expiration_date', 'government_departments', 'target', 'title'] df = df[cols] return df #表彰情報 def _get_corporate_commendation(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/commendation', headers = self.headers, timeout = 10 ) res = r.json()["hojin-infos"][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'commendation', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment'], errors='ignore') cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'date_of_commendation', 'title', 'target', 'category', 'government_departments'] df = df[cols] return df #財務情報 def _get_corporate_finance(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/finance', headers = self.headers, timeout = 10 ) d = r.text res = json.loads(d)['hojin-infos'][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'finance', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment'], errors='ignore') cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'date_of_approval', 'title', 'amount', 'target', 'government_departments', 'note', 'joint_signatures', 'subsidy_resource'] df = df[cols] return df #特許情報 def _get_corporate_patent(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/patent', headers = self.headers, timeout = 10 ) res = r.json()["hojin-infos"][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'patent', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment'], errors='ignore') cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'application_date', 'application_number', 'classifications', 'patent_type', 'title'] df = df[cols] return df #調達情報 def _get_corporate_procurement(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/procurement', headers = self.headers, timeout = 10 ) res = r.json()["hojin-infos"][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'procurement', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment'], errors='ignore') cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'date_of_order', 'title', 'amount', 'government_departments'] df = df[cols] return df #補助金情報 def _get_corporate_subsidy(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/subsidy', headers = self.headers, timeout = 10 ) res = r.json()["hojin-infos"][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'subsidy', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment'], errors='ignore') cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'date_of_approval', 'title', 'amount', 'target', 'government_departments', 'note', 'joint_signatures', 'subsidy_resource'] df = df[cols] return df #職場情報 def _get_corporate_workplace(self, corp_nums): results = [] for corp_num in tqdm(corp_nums): try: r = requests.get( url = self.endpoint_url + str(corp_num) + '/workplace', headers = self.headers, timeout = 10 ) res = r.json()["hojin-infos"][0] results.append(res) except: results.append({}) df = pd.json_normalize(results, 'workplace_info', ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment']) cols = ['corporate_number', 'name', 'representative_name', 'postal_code', 'location', 'capital_stock', 'employee_number', 'business_items', 'update_date', 'date_of_establishment', 'base_infos', 'compatibility_of_childcare_and_work', 'women_activity_infos'] df = df[cols] return df データの取得 gbizinfo = gbizinfo(token=token) #届出・認定情報の取得 #df_gbizinfo_certification = gbizinfo._get_corporate_certification(corp_nums) #print(df_gbizinfo_certification.iloc[0:20,]) #df_gbizinfo_certification.to_csv('gbizinfo_certification.csv', encoding='utf-8-sig') #表彰情報の取得 #df_gbizinfo_commendation = gbizinfo._get_corporate_commendation(corp_nums) #print(df_gbizinfo_commendation.iloc[0:20,]) #df_gbizinfo_commendation.to_csv('gbizinfo_commendation.csv', encoding='utf-8-sig') #財務情報の取得 #df_gbizinfo_finance = gbizinfo._get_corporate_finance(corp_nums) #print(df_gbizinfo_finance) #df_gbizinfo_finance.to_csv('gbizinfo_finance2.csv', encoding='utf-8-sig') #特許情報の取得 #df_gbizinfo_patent = gbizinfo._get_corporate_patent(corp_nums) #print(df_gbizinfo_patent.iloc[0:20,]) #df_gbizinfo_patent.to_csv('gbizinfo_patent.csv', encoding='utf-8-sig') #調達情報の取得 df_gbizinfo_procurement = gbizinfo._get_corporate_procurement(corp_nums) print(df_gbizinfo_procurement.iloc[0:20,]) df_gbizinfo_procurement.to_csv('gbizinfo_procurement.csv', encoding='utf-8-sig') #補助金情報の取得 #df_gbizinfo_subsidy = gbizinfo._get_corporate_subsidy(corp_nums) #print(df_gbizinfo_subsidy.iloc[0:20,]) #df_gbizinfo_subsidy.to_csv('gbizinfo_subsidy.csv', encoding='utf-8-sig') #職場情報の取得 #df_gbizinfo_workplace = gbizinfo._get_corporate_workplace(corp_nums) #print(df_gbizinfo_workplace.iloc[0:20,]) #df_gbizinfo_workplace.to_csv('gbizinfo_workplace.csv', encoding='utf-8-sig') 4. コメント 調達情報、補助金情報は、データが最も充実しており、一部、興味深いデータも見受けられた。 職場情報は、データの欠如が多く見受けられた。 5. 参照 gBizINFOから法人情報APIを取得してみる pythonでgbizinfoを介して法人番号を一気に取得
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

「英語→カタカナ変換機」を叩いて英単語をカタカナに変換したい

何? 英語→カタカナ変換機をBashかPythonで叩きたいなあと思った。 注意 連続して取得したい場合は十分なウェイトを空けておくこと。 Bash word="test" curl 'https://www.sljfaq.org/cgi/e2k_ja.cgi?o=json&word='"${word}"'&lang=ja' \ -A 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36' { "words": [ { "j_pron_spell": "テスト", "type": "dictionary", "word": "test", "j_pron_only": "テスト" } ], "is_romaji": 0, "has_spelling": 0 } Python from json import loads from json import loads from pprint import pp from urllib.parse import urlencode from urllib.request import Request, urlopen url = "https://www.sljfaq.org/cgi/e2k_ja.cgi" data = { "o": "json", "word": "test", "lang": "ja", } headers = { "User-Agent": ( "Mozilla/5.0 (X11; Linux x86_64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/51.0.2704.103 Safari/537.36" ), } params = urlencode(data, doseq=True) req = Request(f"{url}?{params}", headers=headers) res = loads(urlopen(req).read()) pp(res) {'has_spelling': 0, 'words': [{'j_pron_spell': 'テスト', 'type': 'dictionary', 'word': 'test', 'j_pron_only': 'テスト'}], 'is_romaji': 0}
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python 共有メモリ (mmap)

本日はプロセス間、スレッド間などで値をやり取りする方法を紹介します。 C言語などを触ったことある人は聞いたことある人もいるかもしれません、 「共有メモリ (Shared Memory)」という仕組みです。 共有メモリとは かなりざっくりですが、共有メモリとは、名前の通りPCのメモリ上の一部を複数のプロセス間で使用できるようにする仕組みです。最初はこのくらいの理解で十分だと思います。 早速コードを紹介していきます。 共有メモリ管理クラス ClsMMap.py ''' --------------------------- 共有メモリ管理クラス ---------------------------''' import mmap import os WORD_SIZE = 2 '''1WORD = 2byte''' MAP_SIZE = 1024 * WORD_SIZE MAP_FILE_NAME = "./map.dat" class classMMap: def __init__(self) -> None: """コンストラクタ """ self._mm = None ''' map file check ''' if os.path.exists(MAP_FILE_NAME): ''' Read memory map ''' self._readMMapFile() else: ''' create and initialize ''' self._createMMapFile() ''' Read memory map ''' self._readMMapFile() print(" __init__ comp.") def _createMMapFile(self): with open(MAP_FILE_NAME, mode="wb") as file: initStr = '00' * MAP_SIZE initByte = bytes.fromhex(initStr) file.write(initByte) print("_createMMapFile fin.") def _readMMapFile(self): with open(MAP_FILE_NAME, mode="r+b") as file: self._mm = mmap.mmap(file.fileno(), 0) self._mm.seek(0) print("_readMMapFile fin.") def ReadShort(self, adr:int): """指定したアドレスの値を読み込む (読み込みデータ最大値 : 1WORD = 2bytes = 65535) Args: adr : 読み込みたいアドレス """ try: self._mm.seek(adr*WORD_SIZE) bytes = self._mm.read(WORD_SIZE) val = int.from_bytes(bytes, 'little',signed=True) self._mm.seek(0) return val except Exception as e: print("ReadShort except" + str(e).replace('\n','')) return def WriteShort(self, adr:int, data) -> None: """指定したアドレスに値を書き込む (書き込みデータ最大値 : 1WORD = 2bytes = 65535) Args: adr (int): 書き込みたいアドレス data (short): 書き込むデータ """ if data >= (2**(8*WORD_SIZE)): print("Error. Over 2bytes (65535)") return try: ddata = int(data) bytes = ddata.to_bytes(WORD_SIZE, 'little',signed=True) for i in range(WORD_SIZE): self._mm[adr*WORD_SIZE + i] = bytes[i] except Exception as e: print("WriteShort except" + str(e).replace('\n','')) def despose(self): self._mm.close() 共有メモリ使用例 これから実際に使用するサンプルも紹介します。(今回はスレッド間でのデータやり取りで例を示そうと思います。) app.py import time import threading from ClsMmap import classMMap ''' 上で作成した共有メモリ操作クラス ''' TEST_ADR = 100 def func1(): """100番アドレスを1秒ごとに読み出す """ while True: print(clsMMap.ReadShort(TEST_ADR)) time.sleep(1) def func2(): """100番アドレスを1秒ごとに書き換える(1 ~ 10000の範囲) """ while True: curVal = clsMMap.ReadShort(TEST_ADR) clsMMap.WriteShort(TEST_ADR, (curVal + 1) % 10000) ''' 値のインクリメント ''' time.sleep(1) if __name__ == '__main__': ''' 共有メモリ管理クラス インスタンス生成 ''' clsMMap = classMMap() ''' 読み込み用スレッド ''' thread1 = threading.Thread(target=func1) thread1.start() ''' 書き込み用スレッド ''' thread2 = threading.Thread(target=func2) thread2.start() 以上が今回の内容となります。是非参考にしてください! ※実務で利用する場合は排他処理や例外処理をもう少し実装する必要があります。 mmapのClose()処理も適宜呼び出してください。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Python logging 使い方

こんばんは、本日は実務アプリケーションなどでは重宝するloggingについて紹介します。 複数のスレッド間、ファイルが存在するときなどに便利です。 loggingメインプログラム appLogger.py ''' --------------------------- Python Logging サンプル ---------------------------''' import os import time import threading import logging import logging.handlers _logger = None def initLogging(isPrinted: bool): """ロギングの初期設定 (ログは指定したフォルダに保管される) Args: isPrinted (bool): コンソール画面に表示するときTrue """ """ ロギング設定 """ global _logger _logger = logging.getLogger('App') _logger.setLevel(logging.INFO) ''' ログファイル保存用のディレクトリ ''' if os.path.exists('./log') == False: os.makedirs('./log') ''' ログをファイルに出力するための設定 ''' rfh = logging.handlers.RotatingFileHandler( filename='./log/App.log', maxBytes=1024*1024, backupCount=5 ) _logger.addHandler(rfh) formatter = logging.Formatter('%(asctime)s,%(name)s,%(threadName)s,%(lineno)d,%(levelname)s,%(message)s') rfh.setFormatter(formatter) ''' ログをコンソールに出力するための設定 ''' if isPrinted: handler = logging.StreamHandler() handler.setLevel(logging.INFO) _logger.addHandler(handler) handler.setFormatter(formatter) _logger.info('initLogging fin.') def func1(): """Thread 1 Mehotd """ while True: _logger.info("Thread1: Heartbeat...") time.sleep(1) def func2(): """Thread 2 Mehotd """ while True: _logger.info("Thread2: Heartbeat...") time.sleep(1) if __name__ == '__main__': ''' ロギング設定の初期化 ''' initLogging(True) ''' ロギング出力テスト ''' _logger.info("INFO") _logger.warning("WARNING") _logger.error("ERROR") _logger.critical("CRITICAL") ''' 読み込み用スレッド ''' thread1 = threading.Thread(target=func1) thread1.start() ''' 書き込み用スレッド ''' thread2 = threading.Thread(target=func2) thread2.start() 簡単にプログラムの解説をします。 RotatingFileHandler ログファイルの出力と世代管理をします。上記の場合、logフォルダにApp.logというログファイルが生成されます。 filename 出力ファイル名 maxBytes 1ファイルの最大サイズ backupCount 世代管理数 maxBytesを超えると自動的に「App.log」 -> 「App.log1」 に変換され、以降のログは「App.log」に保存されます 上記の場合、世代管理は5なので、「App.log5」まで行くと古いものから削除されます。 出力形式は Formatter() で設定が可能です。 StreamHandler ここではコンソール画面への出力設定をします。出力内容は上記と同じになります。 getChild()による子ロガー作成 loggingメインプログラムで作成したLogging設定を使って、子ロガーを作成します。 これはPythonプログラムが複数に渡るときに便利です。 以下サンプルプログラムになります。親プログラムは上記とほぼ同じです。 LoggerParent.py ''' --------------------------- Python Logging Parent ---------------------------''' import os import logging import logging.handlers _logger = None def initLogging(isPrinted: bool): """ロギングの初期設定 (ログは指定したフォルダに保管される) Args: isPrinted (bool): コンソール画面に表示するときTrue """ """ ロギング設定 """ global _logger _logger = logging.getLogger('App') _logger.setLevel(logging.INFO) ''' ログファイル保存用のディレクトリ ''' if os.path.exists('./log') == False: os.makedirs('./log') ''' ログをファイルに出力するための設定 ''' rfh = logging.handlers.RotatingFileHandler( filename='./log/App.log', maxBytes=1024*1024, backupCount=1 ) _logger.addHandler(rfh) formatter = logging.Formatter('%(asctime)s,%(name)s,%(threadName)s,%(lineno)d,%(levelname)s,%(message)s') rfh.setFormatter(formatter) ''' ログをコンソールに出力するための設定 ''' if isPrinted: handler = logging.StreamHandler() handler.setLevel(logging.INFO) _logger.addHandler(handler) handler.setFormatter(formatter) _logger.info('initLogging fin.') if __name__ == '__main__': ''' ロギング設定の初期化 ''' initLogging(True) ''' ロギング出力テスト ''' _logger.info("INFO") _logger.warning("WARNING") _logger.error("ERROR") _logger.critical("CRITICAL") ''' Childアプリの起動 ''' import LoggerChild LoggerChild.Start() 続いて、子ロガー側の紹介です。 実はたった1行で子ロガーは親ロガーの設定を引き継いで作成できます。 LoggerChild.py ''' --------------------------- Python Logging Child ---------------------------''' import logging def Start(): ''' 子ロガーの設定 ''' _logger = logging.getLogger("App").getChild("Child") _logger.info("logging child fin.") Parent側を実行したときの結果です。 出力結果は以下となります。子ロガーの方は、nameが「App.Child」になります。 以上でloggingの解説は終了します。 ※実務で利用する場合は、適宜例外処理などは追加してください。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

頻繁に「python setup.py test」と打つのが面倒だった

はじめに 理由はわからないけど「python setup.py test」って打つのが面倒。 度々「pytyhon」なんて打ってしまう… 対処 元々、~/.inputrc にこんな設定(※一部)は入れていた。 "\C-f": reverse-search-history "\C-m": accept-line 設定を入れていなくても「Ctrl-r」でイケるけど、ブラウザーの「Ctrl-f」と合わせてると指が楽。 ここから過去コマンドを探すわけだけど、頭が悪いことに前方一致しか使っていなかった。部分一致が使えることに、今更ながらに気づいた。 で、こんな風に入力することで容易に python setup.py text が特定できるようになった。打ち慣れているのか、指も楽。 py te 以上
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ちょっとした動作確認をするためのスクリプト、を作るためのスクリプト

はじめに perlとか、bashとか、pythonとか、プログラムを組んでいると「ちょっとした確認」のためにスクリプトを書く。 毎回作るのが煩わしかったので、こんなスクリプトを作ってPATHを通してあるディレクトリに置いた。 スクリプト 便利。 #!/usr/bin/bash echo "#!/usr/bin/python" > zzz.py echo >> zzz.py chmod u+x zzz.py ※捨てファイルは「zzz*」と決めています。 以上 他にいい方法があったら教えてください。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python&STEP CADデータ】STEPCodeをPython3環境で利用する(※リポジトリの管理状態・運用方法に注意)

先日に引き続き、Pythonを利用してSTEPCodeライブラリをインストールする方法を紹介する。 OSはWindows 10共通で行う。 (Linuxでの導入は別途。Linuxの方が数倍楽。) Python3ライブラリの経緯 (情報:2021.10.7) 元々Python2での対応であったが、githubの有志の方@Devonsparksによって、Issue #407が提示された。 その後、Pull Requestにより本人によってコードが変更されていて、概ねテストも完了しているようだが、 githubのPR Integration(CI環境での統合テスト)で作業が止まっているようだ。 改めて確認したところ、PR Integrationで使用しているサービス(AppVeyor)がWindowsでのcmakeビルドをやっているようだが、古い環境であるとのことで最新の状態では削除されて、ブランチ「check_pr_actions」の状態で、しばらく放置されている。 少なくとも私の環境では@Devonsparksのリポジトリをクローンしても(おそらく)正常に動いているので、試しに使う分には問題ないはず。 このライブラリをPythonサーバーサイドで使うときとかは、注意して使う。 全体的にContributorが少なく、ネイティブなCの環境からゆっくりと移行しているので、最新のPython運用環境やCI環境と比べると着手が遅い。 (なので、盛り上がってくれると、その辺りがさらに良くなると期待できる!) STEPCodeの採用メリット STEPCodeは昔からネイティブCで管理されており、"STEPファイルを読み取れるライブラリでかつ制約なく商用利用出来る"意味で、独自のライブラリである。 「中・小規模のPythonのサービスでCADデータを取り扱える」 ある意味、完全に差別化されたメリットである。 ただし、基本コードは独自のスキーマで全て記述されていて、その変換コードはC++で記述されていることからすごくレガシーでもある。 (Python3対応が遅れている現状からみてもご察しかもしれないが・・・) その他のSTEP CADライブラリ Open CASCADE ちなみに、STEP CADデータが扱えるライブラリにOpen CASCADEがある。 こちらはGPLであるなどの制約があったり、どうも未定義動作やテストで課題が出ているようだが、こちらの方がよく使われている印象。 もしかすると、コミュニティにまだ人がいてフリーでコード修正できるようなContributorがいるだけでも、STEPCodeはまだよいかもしれない・・・。。。。 (確認したところ、Open CASCADEの最新アップデートは2018年のようだ。) ST Developer このほか、1991年に発案され今も管理されて続けているST Viewer (ST Developer)と呼ばれるものがある。 ただしこちらは現在は無償ではなく(昔は無償だったんだけど)、ライセンス料を支払わなくてはいけない。 このように、CADを扱えるライブラリの選択肢は非常に限られており、高額/有償の環境に頼るか、無償でチープで古臭い環境で頑張るかになる。 なので、こんな状況を少しでも変えて、少しでも布教して、色々な案件に応用出来たら・・・と思うばかりだ。 STEPCodeで提供されているPythonコードの種類 Pythonでは、Parse処理のみ使用する場合と、スキーマを利用する場合と二つの手段がある。 パース処理を利用する場合は、Pythonの固有コードのみで実装できる。非常に単純だ。 一方、AP203とかAP214とかのスキーマを定義する場合は、Generatorを使用して、pyモジュールを生成する必要があるようだ。 パース処理のみの場合はお手軽だが型の厳密性がないので、規格情報を使用する場合はこの過程を必ず挟む必要がある。 インストール① (パース処理のみ) 動作環境 Windows 10 Professional バージョン20H2 Python 3.7.8, 3.9.7で動作確認 Visual Studio Code + Python Extensionを使用 1. Githubリポジトリからクローン 今回は@Devonsparksのブランチpy3を使用する。 現状プルリクエストのレビュー中ですので、マージされたらSTEPCode本体のリポジトリからプルすると良い。 git clone https://github.com/devonsparks/stepcode cd stepcode git checkout py3 2. Python Build pipで以下のライブラリをインストール python -m pip install wheel build 手順にもあるように、PyPAモジュールを使用するのと、インストール時はwheelファイルを使用する点に注意。 PyPIにはまだSTEPCodeはリポジトリとして挙がっていない。 (いつか使えるくらいのレベルになると、とてもうれしい) 次に、リポジトリのルートから「.\src\exp2python\python」に移動し、 python -m build を押すことで、wheelファイルがdistフォルダに生成される。 終了したらこんな感じ Wheelファイルが出来たら、 cd dist python -m pip install SCL-0.6.1-py3-none-any.whl とすることでSCL(STEPCode Library)がインストールされる。 なお、Python2版を入れている場合、SCLがアンインストールできないので、--ignore-installを追加してpip installする。 3. テスト リポジトリのルートから「.\src\exp2python\python\test」に移動し python test_parser.py などとしてエラーが発生していなければ完了。 動作確認 C++で実施した機能と同じもの(単にデータ番号と型を並べるだけ)なら、規格の情報がいらないので以下の内容だけで十分実行出来る。 金型データに関しては以下のqiitaの記事を参照 kana.py from SCL.Part21 import Parser, SimpleEntity, ComplexEntity import io path="F:\\StepData\\BSP35B20-N-12.stp" step_text = None with open(path) as f: step_text = f.read() parser = Parser() model = parser.parse(step_text) with open("log_out.txt", "w") as cout: for section in model.sections: for entity in section.entities: if type(entity) is SimpleEntity: cout.write("{0},{1}\n".format(entity.ref, entity.type_name)) elif type(entity) is ComplexEntity: cout.write("{0} ComplexEntity\n".format(entity.ref)) for list_parameter in entity.params: for parameter in list_parameter: cout.write("+ {0}\n".format(parameter.type_name)) 出力結果 インストール(②スキーマも使う) 何をするか なんと、中にCで書かれた規格ファイル(*.expファイル)をPythonで自動してくれる機能があったりする。 これを使って、各規格データをpyコードで生成する。 動作環境 C++でビルド出来る環境が必要です。 追加部分は赤字で記す Windows 10 Professional バージョン20H2 Python 3.7.8, 3.9.7で動作確認 Visual Studio Code + Python Extensionを使用 cmake gui 3.19.0-rc2 現在の最新版は3.21.xなので若干動作が変わる可能性あり Visual Studio 2019 Professional C++ 14 1. cmake リポジトリのルートディレクトリをcmakeでビルドする。 流れはVC++編と同じ 入力を終えたらConfigureボタンを押す Cmake Generatorで使用するIDEを選択する この記事の作者はあまり詳しくないが、EclipseやKate, Sublimeなども指定できるようだ 終わったら「Finish」押す 終わったら「Generate」を押してプロジェクトを生成する。 2.cmakeしたソリューションを開いてビルドする ターゲットディレクトリを開くと、直下に"SC.sln"というソリューションファイルが生成されるので、 これをVisual Studio 2019で開く。 そうすると大量のC++のプロジェクトが連なって生成される。 そう、これを全てビルドするのだ。 メモリ16GB, CPU Core-i7-9700を搭載した私のPCでも1時間半ビルドに時間を要した。 それほど長い作業である。 Debugモードでビルドすると、ターゲットディレクトリに「build\bin」というフォルダが出来るはずだ。 この中にあるexp2python.exeを使用する。 とりあえず環境設定を振っておくか、このディレクトリのパスをクリップボードにコピーして、次の作業に移る 3. スキーマのスクリプトをpythonコード化 次に、リポジトリのルートディレクトリの直下に「data」というフォルダがあるので参照する。 そうすると、各規格の一覧がずらっと並んでいるのが分かるはず ここでは、例えば「ap203」を使用するものとする。 「ap203」を開いて、コマンドプロンプトで次のように書く。 exp2python ap203.exp すると、ap203のフォルダの中にconfig_control_design.pyというファイルが出来る。 これはC++の規格データからpythonコードに置き換えたものであるので、 これを後は、本体のPythonコードの近くにモジュールとして置いておけばよい。 動作確認 先ほどのkana.pyを少し変更して、 定義されているEntity名がスキーマap203に存在しているならばTrueを返す という処理を追加しよう。 ただし、スキーマのEntity名は大文字になっているので、これは小文字に変換する。 kana2.py from config_control_design import * from SCL.Part21 import Parser, SimpleEntity, ComplexEntity import io path="F:\\StepData\\BSP35B20-N-12.stp" step_text = None with open(path) as f: step_text = f.read() parser = Parser() model = parser.parse(step_text) with open('log_out2.txt', 'w') as cout: for section in model.sections: for entity in section.entities: if type(entity) is SimpleEntity: cout.write("{0},{1},{2}\n".format(entity.ref, entity.type_name, entity.type_name.lower() in globals())) elif type(entity) is ComplexEntity: for list_parameter in entity.params: for parameter in list_parameter: cout.write("+ {0} {1}\n".format(parameter.type_name, parameter.type_name.lower() in globals())) 実行結果 全数確認したが、Falseになっている項目はなく、全てのスキーマのEntityが定義されていることが判明した。 ソースコード少し解説 ちなみに感覚的には、Pythonの方が扱いやすいと思う。 理由は、クラス名を文字列から動的に名前付けする機能がPythonにはあるため、複数の規格が乱立するときにそれを一本化出来るためだ。 恐らく、Pythonを普通に使ってて、めったに使わないこの機能をガシガシ使う必要がある。 このことを知って、なかなかすごいライブラリだなとは思った。 上記のkana2.pyでいえば、 cout.write("{0},{1},{2}\n".format(entity.ref, entity.type_name, entity.type_name.lower() in globals())) 特にここで言うglobals()とか(先ほどの記事ではgetattrとか)は、普通にプログラミングしている環境だと積極的に使うケースは少ない。 だが、このSTEPCodeではこれなしではやっていけないような気すらする。 なぜこんなことが出来るか? それは、先ほど見たap203.expにスキーマの定義全てが書いてあるからだ。 そこで、一部抜粋すると ap203.exp(一部抜粋) ENTITY representation_map; mapping_origin : representation_item; mapped_representation : representation; INVERSE map_usage : SET [1:?] OF mapped_item FOR mapping_source; WHERE wr1: item_in_context(SELF.mapping_origin,SELF.mapped_representation. context_of_items); END_ENTITY; -- representation_map ENTITY representation_relationship; name : label; description : text; rep_1 : representation; rep_2 : representation; END_ENTITY; -- representation_relationship ENTITY representation_relationship_with_transformation SUBTYPE OF (representation_relationship); transformation_operator : transformation; WHERE wr1: (SELF\representation_relationship.rep_1.context_of_items :<>: SELF\representation_relationship.rep_2.context_of_items); END_ENTITY; -- representation_relationship_with_transformation こんな感じで、独自のSTEP用の言語仕様で、要素(Entity)、継承関係、属性(Attribute)が記述されているのだ。 これをpythonに変換しているものだから、スキーマとして成立するのである。 このSTEPの独自仕様に関しては、今後取り扱うとして、 このSTEPCodeがどのようなライブラリなのか、若干想像がつくと思う。 独自仕様のXMLなどに似ている形式かもしれない。 つまり、CADデータを一定のスキーマに基づいて管理するためのファイル=STEPファイルである。 そして、そのスキーマはISO10303の規格で取り決めされており、STLなどのフリーCADの形態と互換性を保っているのである。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python&STEP CADデータ】STEPCodeをPython3&Windows環境で利用する(※リポジトリの管理状態・運用方法に注意)

先日に引き続き、Pythonを利用してSTEPCodeライブラリをインストールする方法を紹介する。 OSはWindows 10共通で行う。 (Linuxでの導入は別途。Linuxの方が数倍楽。) STEPファイルについて知りたい人はこちらを見てくれ Python3ライブラリの経緯 (情報:2021.10.7) 元々Python2での対応であったが、githubの有志の方@Devonsparksによって、Issue #407が提示された。 その後、Pull Requestにより本人によってコードが変更されていて、概ねテストも完了しているようだが、 githubのPR Integration(CI環境での統合テスト)で作業が止まっているようだ。 改めて確認したところ、PR Integrationで使用しているサービス(AppVeyor)がWindowsでのcmakeビルドをやっているようだが、古い環境であるとのことで最新の状態では削除されて、ブランチ「check_pr_actions」の状態で、しばらく放置されている。 少なくとも私の環境では@Devonsparksのリポジトリをクローンしても(おそらく)正常に動いているので、試しに使う分には問題ないはず。 このライブラリをPythonサーバーサイドで使うときとかは、注意して使う。 全体的にContributorが少なく、ネイティブなCの環境からゆっくりと移行しているので、最新のPython運用環境やCI環境と比べると着手が遅い。 (なので、盛り上がってくれると、その辺りがさらに良くなると期待できる!) STEPCodeの採用メリット STEPCodeは昔からネイティブCで管理されており、"STEPファイルを読み取れるライブラリでかつ制約なく商用利用出来る"意味で、独自のライブラリである。 「中・小規模のPythonのサービスでCADデータを取り扱える」 ある意味、完全に差別化されたメリットである。 ただし、基本コードは独自のスキーマで全て記述されていて、その変換コードはC++で記述されていることからすごくレガシーでもある。 (Python3対応が遅れている現状からみてもご察しかもしれないが・・・) その他のSTEP CADライブラリ Open CASCADE ちなみに、STEP CADデータが扱えるライブラリにOpen CASCADEがある。 こちらはGPLであるなどの制約があったり、どうも未定義動作やテストで課題が出ているようだが、こちらの方がよく使われている印象。 もしかすると、コミュニティにまだ人がいてフリーでコード修正できるようなContributorがいるだけでも、STEPCodeはまだよいかもしれない・・・。。。。 (確認したところ、Open CASCADEの最新アップデートは2018年のようだ。) ST Developer このほか、1991年に発案され今も管理されて続けているST Viewer (ST Developer)と呼ばれるものがある。 ただしこちらは現在は無償ではなく(昔は無償だったんだけど)、ライセンス料を支払わなくてはいけない。 このように、CADを扱えるライブラリの選択肢は非常に限られており、高額/有償の環境に頼るか、無償でチープで古臭い環境で頑張るかになる。 なので、こんな状況を少しでも変えて、少しでも布教して、色々な案件に応用出来たら・・・と思うばかりだ。 STEPCodeで提供されているPythonコードの種類 Pythonでは、Parse処理のみ使用する場合と、スキーマを利用する場合と二つの手段がある。 パース処理を利用する場合は、Pythonの固有コードのみで実装できる。非常に単純だ。 一方、AP203とかAP214とかのスキーマを定義する場合は、Generatorを使用して、pyモジュールを生成する必要があるようだ。 パース処理のみの場合はお手軽だが型の厳密性がないので、規格情報を使用する場合はこの過程を必ず挟む必要がある。 インストール① (パース処理のみ) 動作環境 Windows 10 Professional バージョン20H2 Python 3.7.8, 3.9.7で動作確認 Visual Studio Code + Python Extensionを使用 1. Githubリポジトリからクローン 今回は@Devonsparksのブランチpy3を使用する。 現状プルリクエストのレビュー中ですので、マージされたらSTEPCode本体のリポジトリからプルすると良い。 git clone https://github.com/devonsparks/stepcode cd stepcode git checkout py3 2. Python Build pipで以下のライブラリをインストール python -m pip install wheel build 手順にもあるように、PyPAモジュールを使用するのと、インストール時はwheelファイルを使用する点に注意。 PyPIにはまだSTEPCodeはリポジトリとして挙がっていない。 (いつか使えるくらいのレベルになると、とてもうれしい) 次に、リポジトリのルートから「.\src\exp2python\python」に移動し、 python -m build を押すことで、wheelファイルがdistフォルダに生成される。 終了したらこんな感じ Wheelファイルが出来たら、 cd dist python -m pip install SCL-0.6.1-py3-none-any.whl とすることでSCL(STEPCode Library)がインストールされる。 なお、過去にPython2版SCLを入れている場合、SCLがアンインストールできないので、--ignore-installを追加してpip installする。 3. テスト リポジトリのルートから「.\src\exp2python\python\test」に移動し python test_parser.py などとしてエラーが発生していなければ完了。 動作確認 C++で実施した機能と同じもの(単にデータ番号と型を並べるだけ)なら、規格の情報がいらないので以下の内容だけで十分実行出来る。 金型データに関しては以下のqiitaの記事を参照 kana.py from SCL.Part21 import Parser, SimpleEntity, ComplexEntity import io path="F:\\StepData\\BSP35B20-N-12.stp" step_text = None with open(path) as f: step_text = f.read() parser = Parser() model = parser.parse(step_text) with open("log_out.txt", "w") as cout: for section in model.sections: for entity in section.entities: if type(entity) is SimpleEntity: cout.write("{0},{1}\n".format(entity.ref, entity.type_name)) elif type(entity) is ComplexEntity: cout.write("{0} ComplexEntity\n".format(entity.ref)) for list_parameter in entity.params: for parameter in list_parameter: cout.write("+ {0}\n".format(parameter.type_name)) 出力結果 インストール(②スキーマも使う) 何をするか なんと、規格ファイル(*.expファイル)をPythonで自動してくれる機能があったりする。 これを使って、各規格データをpyコードで生成する。 動作環境 C++でビルド出来る環境が必要です。 追加部分は赤字で記す Windows 10 Professional バージョン20H2 Python 3.7.8, 3.9.7で動作確認 Visual Studio Code + Python Extensionを使用 cmake gui 3.19.0-rc2 現在の最新版は3.21.xなので若干動作が変わる可能性あり Visual Studio 2019 Professional C++ 14 1. cmake リポジトリのルートディレクトリをcmakeでビルドする。 流れはVC++編と同じ 入力を終えたらConfigureボタンを押す Cmake Generatorで使用するIDEを選択する この記事の作者はあまり詳しくないが、EclipseやKate, Sublimeなども指定できるようだ 終わったら「Finish」押す 終わったら「Generate」を押してプロジェクトを生成する。 2.cmakeしたソリューションを開いてビルドする ターゲットディレクトリを開くと、直下に"SC.sln"というソリューションファイルが生成されるので、 これをVisual Studio 2019で開く。 そうすると大量のC++のプロジェクトが連なって生成される。 そう、これを全てビルドするのだ。 メモリ16GB, CPU Core-i7-9700を搭載した私のPCでも1時間半ビルドに時間を要した。 それほど長い作業である。 Debugモードでビルドすると、ターゲットディレクトリに「build\bin」というフォルダが出来るはずだ。 この中にあるexp2python.exeを使用する。 とりあえず環境設定を振っておくか、このディレクトリのパスをクリップボードにコピーして、次の作業に移る 3. スキーマのスクリプトをpythonコード化 次に、リポジトリのルートディレクトリの直下に「data」というフォルダがあるので参照する。 そうすると、各規格の一覧がずらっと並んでいるのが分かるはず ここでは、例えば「ap203」を使用するものとする。 「ap203」を開いて、コマンドプロンプトで次のように書く。 exp2python ap203.exp すると、ap203のフォルダの中にconfig_control_design.pyというファイルが出来る。 これはC++の規格データからpythonコードに置き換えたものであるので、 これを後は、本体のPythonコードの近くにモジュールとして置いておけばよい。 動作確認 先ほどのkana.pyを少し変更して、 定義されているEntity名がスキーマap203に存在しているならばTrueを返す という処理を追加しよう。 ただし、スキーマのEntity名は大文字になっているので、これは小文字に変換する。 kana2.py from config_control_design import * from SCL.Part21 import Parser, SimpleEntity, ComplexEntity import io path="F:\\StepData\\BSP35B20-N-12.stp" step_text = None with open(path) as f: step_text = f.read() parser = Parser() model = parser.parse(step_text) with open('log_out2.txt', 'w') as cout: for section in model.sections: for entity in section.entities: if type(entity) is SimpleEntity: cout.write("{0},{1},{2}\n".format(entity.ref, entity.type_name, entity.type_name.lower() in globals())) elif type(entity) is ComplexEntity: for list_parameter in entity.params: for parameter in list_parameter: cout.write("+ {0} {1}\n".format(parameter.type_name, parameter.type_name.lower() in globals())) 実行結果 全数確認したが、Falseになっている項目はなく、全てのスキーマのEntityが定義されていることが判明した。 ソースコード少し解説 ちなみに感覚的には、Pythonの方が扱いやすいと思う。 理由は、クラス名を文字列から動的に名前付けする機能がPythonにはあるため、複数の規格が乱立するときにそれを一本化出来るためだ。 恐らく、Pythonを普通に使ってて、めったに使わないこの機能をガシガシ使う必要がある。 このことを知って、なかなかすごいライブラリだなとは思った。 上記のkana2.pyでいえば、 cout.write("{0},{1},{2}\n".format(entity.ref, entity.type_name, entity.type_name.lower() in globals())) 特にここで言うglobals()とか(先ほどの記事ではgetattrとか)は、普通にプログラミングしている環境だと積極的に使うケースは少ない。 だが、このSTEPCodeではこれなしではやっていけないような気すらする。 なぜこんなことが出来るか? それは、先ほど見たap203.expにスキーマの定義全てが書いてあるからだ。 そこで、一部抜粋すると ap203.exp(一部抜粋) ENTITY representation_map; mapping_origin : representation_item; mapped_representation : representation; INVERSE map_usage : SET [1:?] OF mapped_item FOR mapping_source; WHERE wr1: item_in_context(SELF.mapping_origin,SELF.mapped_representation. context_of_items); END_ENTITY; -- representation_map ENTITY representation_relationship; name : label; description : text; rep_1 : representation; rep_2 : representation; END_ENTITY; -- representation_relationship ENTITY representation_relationship_with_transformation SUBTYPE OF (representation_relationship); transformation_operator : transformation; WHERE wr1: (SELF\representation_relationship.rep_1.context_of_items :<>: SELF\representation_relationship.rep_2.context_of_items); END_ENTITY; -- representation_relationship_with_transformation こんな感じで、独自のSTEP用の言語仕様で、要素(Entity)、継承関係、属性(Attribute)が記述されているのだ。 これをpythonに変換しているものだから、スキーマとして成立するのである。 このSTEPの独自仕様に関しては、今後取り扱うとして、 このSTEPCodeがどのようなライブラリなのか、若干想像がつくと思う。 独自仕様のXMLなどに似ている形式かもしれない。 つまり、CADデータを一定のスキーマに基づいて管理するためのファイル=STEPファイルである。 そして、そのスキーマはISO10303の規格で取り決めされており、STLなどの形状定義フリーCADの形態と互換性を保っているのである。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Python&STEP CADデータ】STEPCodeをPython3&Windows環境で利用する(※注意)

先日に引き続き、Pythonを利用してSTEPCodeライブラリをインストールする方法を紹介する。 OSはWindows 10共通で行う。 (Linuxでの導入は別途。Linuxの方が数倍楽。) STEPファイルについて知りたい人はこちらを見てくれ Python3ライブラリの経緯 (情報:2021.10.7) 元々Python2での対応であったが、githubの有志の方@Devonsparksによって、Issue #407が提示された。 その後、Pull Requestにより本人によってコードが変更されていて、概ねテストも完了しているようだが、 githubのPR Integration(CI環境での統合テスト)で作業が止まっているようだ。 改めて確認したところ、PR Integrationで使用しているサービス(AppVeyor)がWindowsでのcmakeビルドをやっているようだが、古い環境であるとのことで最新の状態では削除されて、ブランチ「check_pr_actions」の状態で、しばらく放置されている。 少なくとも私の環境では@Devonsparksのリポジトリをクローンしても(おそらく)正常に動いているので、試しに使う分には問題ないはず。 このライブラリをPythonサーバーサイドで使うときとかは、注意して使う。 全体的にContributorが少なく、ネイティブなCの環境からゆっくりと移行しているので、最新のPython運用環境やCI環境と比べると着手が遅い。 (なので、盛り上がってくれると、その辺りがさらに良くなると期待できる!) STEPCodeの採用メリット STEPCodeは昔からネイティブCで管理されており、"STEPファイルを読み取れるライブラリでかつ制約なく商用利用出来る"意味で、独自のライブラリである。 「中・小規模のPythonのサービスでCADデータを取り扱える」 ある意味、完全に差別化されたメリットである。 ただし、基本コードは独自のスキーマで全て記述されていて、その変換コードはC++で記述されていることからすごくレガシーでもある。 (Python3対応が遅れている現状からみてもご察しかもしれないが・・・) その他のSTEP CADライブラリ Open CASCADE ちなみに、STEP CADデータが扱えるライブラリにOpen CASCADEがある。 こちらはGPLであるなどの制約があったり、どうも未定義動作やテストで課題が出ているようだが、こちらの方がよく使われている印象。 もしかすると、コミュニティにまだ人がいてフリーでコード修正できるようなContributorがいるだけでも、STEPCodeはまだよいかもしれない・・・。。。。 (確認したところ、Open CASCADEの最新アップデートは2018年のようだ。) ST Developer このほか、1991年に発案され今も管理されて続けているST Viewer (ST Developer)と呼ばれるものがある。 ただしこちらは現在は無償ではなく(昔は無償だったんだけど)、ライセンス料を支払わなくてはいけない。 このように、CADを扱えるライブラリの選択肢は非常に限られており、高額/有償の環境に頼るか、無償でチープで古臭い環境で頑張るかになる。 なので、こんな状況を少しでも変えて、少しでも布教して、色々な案件に応用出来たら・・・と思うばかりだ。 STEPCodeで提供されているPythonコードの種類 Pythonでは、Parse処理のみ使用する場合と、スキーマを利用する場合と二つの手段がある。 パース処理を利用する場合は、Pythonの固有コードのみで実装できる。非常に単純だ。 一方、AP203とかAP214とかのスキーマを定義する場合は、Generatorを使用して、pyモジュールを生成する必要があるようだ。 パース処理のみの場合はお手軽だが型の厳密性がないので、規格情報を使用する場合はこの過程を必ず挟む必要がある。 インストール① (パース処理のみ) 動作環境 Windows 10 Professional バージョン20H2 Python 3.7.8, 3.9.7で動作確認 Visual Studio Code + Python Extensionを使用 1. Githubリポジトリからクローン 今回は@Devonsparksのブランチpy3を使用する。 現状プルリクエストのレビュー中ですので、マージされたらSTEPCode本体のリポジトリからプルすると良い。 git clone https://github.com/devonsparks/stepcode cd stepcode git checkout py3 2. Python Build pipで以下のライブラリをインストール python -m pip install wheel build 手順にもあるように、PyPAモジュールを使用するのと、インストール時はwheelファイルを使用する点に注意。 PyPIにはまだSTEPCodeはリポジトリとして挙がっていない。 (いつか使えるくらいのレベルになると、とてもうれしい) 次に、リポジトリのルートから「.\src\exp2python\python」に移動し、 python -m build を押すことで、wheelファイルがdistフォルダに生成される。 終了したらこんな感じ Wheelファイルが出来たら、 cd dist python -m pip install SCL-0.6.1-py3-none-any.whl とすることでSCL(STEPCode Library)がインストールされる。 なお、過去にPython2版SCLを入れている場合、SCLがアンインストールできないので、--ignore-installを追加してpip installする。 3. テスト リポジトリのルートから「.\src\exp2python\python\test」に移動し python test_parser.py などとしてエラーが発生していなければ完了。 動作確認 C++で実施した機能と同じもの(単にデータ番号と型を並べるだけ)なら、規格の情報がいらないので以下の内容だけで十分実行出来る。 金型データに関しては以下のqiitaの記事を参照 kana.py from SCL.Part21 import Parser, SimpleEntity, ComplexEntity import io path="F:\\StepData\\BSP35B20-N-12.stp" step_text = None with open(path) as f: step_text = f.read() parser = Parser() model = parser.parse(step_text) with open("log_out.txt", "w") as cout: for section in model.sections: for entity in section.entities: if type(entity) is SimpleEntity: cout.write("{0},{1}\n".format(entity.ref, entity.type_name)) elif type(entity) is ComplexEntity: cout.write("{0} ComplexEntity\n".format(entity.ref)) for list_parameter in entity.params: for parameter in list_parameter: cout.write("+ {0}\n".format(parameter.type_name)) 出力結果 インストール(②スキーマも使う) 何をするか なんと、規格ファイル(*.expファイル)をPythonで自動してくれる機能があったりする。 これを使って、各規格データをpyコードで生成する。 動作環境 C++でビルド出来る環境が必要です。 追加部分は赤字で記す Windows 10 Professional バージョン20H2 Python 3.7.8, 3.9.7で動作確認 Visual Studio Code + Python Extensionを使用 cmake gui 3.19.0-rc2 現在の最新版は3.21.xなので若干動作が変わる可能性あり Visual Studio 2019 Professional C++ 14 1. cmake リポジトリのルートディレクトリをcmakeでビルドする。 流れはVC++編と同じ 入力を終えたらConfigureボタンを押す Cmake Generatorで使用するIDEを選択する この記事の作者はあまり詳しくないが、EclipseやKate, Sublimeなども指定できるようだ 終わったら「Finish」押す 終わったら「Generate」を押してプロジェクトを生成する。 2.cmakeしたソリューションを開いてビルドする ターゲットディレクトリを開くと、直下に"SC.sln"というソリューションファイルが生成されるので、 これをVisual Studio 2019で開く。 そうすると大量のC++のプロジェクトが連なって生成される。 そう、これを全てビルドするのだ。 メモリ16GB, CPU Core-i7-9700を搭載した私のPCでも1時間半ビルドに時間を要した。 それほど長い作業である。 Debugモードでビルドすると、ターゲットディレクトリに「build\bin」というフォルダが出来るはずだ。 この中にあるexp2python.exeを使用する。 とりあえず環境設定を振っておくか、このディレクトリのパスをクリップボードにコピーして、次の作業に移る 3. スキーマのスクリプトをpythonコード化 次に、リポジトリのルートディレクトリの直下に「data」というフォルダがあるので参照する。 そうすると、各規格の一覧がずらっと並んでいるのが分かるはず ここでは、例えば「ap203」を使用するものとする。 「ap203」を開いて、コマンドプロンプトで次のように書く。 exp2python ap203.exp すると、ap203のフォルダの中にconfig_control_design.pyというファイルが出来る。 これはC++の規格データからpythonコードに置き換えたものであるので、 これを後は、本体のPythonコードの近くにモジュールとして置いておけばよい。 動作確認 先ほどのkana.pyを少し変更して、 定義されているEntity名がスキーマap203に存在しているならばTrueを返す という処理を追加しよう。 ただし、スキーマのEntity名は大文字になっているので、これは小文字に変換する。 kana2.py from config_control_design import * from SCL.Part21 import Parser, SimpleEntity, ComplexEntity import io path="F:\\StepData\\BSP35B20-N-12.stp" step_text = None with open(path) as f: step_text = f.read() parser = Parser() model = parser.parse(step_text) with open('log_out2.txt', 'w') as cout: for section in model.sections: for entity in section.entities: if type(entity) is SimpleEntity: cout.write("{0},{1},{2}\n".format(entity.ref, entity.type_name, entity.type_name.lower() in globals())) elif type(entity) is ComplexEntity: for list_parameter in entity.params: for parameter in list_parameter: cout.write("+ {0} {1}\n".format(parameter.type_name, parameter.type_name.lower() in globals())) 実行結果 全数確認したが、Falseになっている項目はなく、全てのスキーマのEntityが定義されていることが判明した。 ソースコード少し解説 ちなみに感覚的には、Pythonの方が扱いやすいと思う。 理由は、クラス名を文字列から動的に名前付けする機能がPythonにはあるため、複数の規格が乱立するときにそれを一本化出来るためだ。 恐らく、Pythonを普通に使ってて、めったに使わないこの機能をガシガシ使う必要がある。 このことを知って、なかなかすごいライブラリだなとは思った。 上記のkana2.pyでいえば、 cout.write("{0},{1},{2}\n".format(entity.ref, entity.type_name, entity.type_name.lower() in globals())) 特にここで言うglobals()とか(先ほどの記事ではgetattrとか)は、普通にプログラミングしている環境だと積極的に使うケースは少ない。 だが、このSTEPCodeではこれなしではやっていけないような気すらする。 なぜこんなことが出来るか? それは、先ほど見たap203.expにスキーマの定義全てが書いてあるからだ。 そこで、一部抜粋すると ap203.exp(一部抜粋) ENTITY representation_map; mapping_origin : representation_item; mapped_representation : representation; INVERSE map_usage : SET [1:?] OF mapped_item FOR mapping_source; WHERE wr1: item_in_context(SELF.mapping_origin,SELF.mapped_representation. context_of_items); END_ENTITY; -- representation_map ENTITY representation_relationship; name : label; description : text; rep_1 : representation; rep_2 : representation; END_ENTITY; -- representation_relationship ENTITY representation_relationship_with_transformation SUBTYPE OF (representation_relationship); transformation_operator : transformation; WHERE wr1: (SELF\representation_relationship.rep_1.context_of_items :<>: SELF\representation_relationship.rep_2.context_of_items); END_ENTITY; -- representation_relationship_with_transformation こんな感じで、独自のSTEP用の言語仕様で、要素(Entity)、継承関係、属性(Attribute)が記述されているのだ。 これをpythonに変換しているものだから、スキーマとして成立するのである。 このSTEPの独自仕様に関しては、今後取り扱うとして、 このSTEPCodeがどのようなライブラリなのか、若干想像がつくと思う。 独自仕様のXMLなどに似ている形式かもしれない。 つまり、CADデータを一定のスキーマに基づいて管理するためのファイル=STEPファイルである。 そして、そのスキーマはISO10303の規格で取り決めされており、STLなどの形状定義フリーCADの形態と互換性を保っているのである。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

エンコーディング

この記事の狙い・目的 機械学習を取り入れたAIシステムの構築は、 ①データセット作成(前処理)→ ②モデルの構築 → ③モデルの適用 というプロセスで行っていきます。 その際「データセット作成(前処理)」の段階では、正しくモデル構築できるよう、事前にデータを整備しておくことが求めらます。 このブログでは、その際「特徴量エンジニアリング」として用いられることの多い「エンコーディング」手法について解説していきます。 プログラムの実行環境 Python3 MacBook pro(端末) PyCharm(IDE) Jupyter Notebook(Chrome) Google スライド(Chrome) 使用するデータセット import pandas as pd train_df = pd.read_csv("./train.csv") train_df.shape train_df.head() 確認の結果、「名義尺度」は「状態1、販売条件」、「順序尺度」は「全体的な品質、地下室の状態」であることがわかりました。 データセット:House Prices - Advanced Regression Techniques - kaggle エンコーディング エンコーディングとは、カテゴリ(質的)変数を数値に変換(符号化 (encode))する手法のことです。 カテゴリ変数とは カテゴリ(質的)変数とは、以下の「名義尺度」と「順序尺度」の尺度で表される変数のことを言います。 名義尺度とは、他と区別・分類するための尺度。例えば、性別、血液型などです。 順序尺度とは、順序に意味はあるが、それらの間隔には意味がない尺度。例えば、順序、ランクなどです。 なぜカテゴリ変数の変換が必要なのか 多くのアルゴリズムではカテゴリカルな変数(例:性別・ランク)は、そのままの形(≠数値)では学習させることができません。その為、何らかの量的変数に置き換えて表現する必要があるから エンコーディング手法 それではカテゴリ変数を変換する様々な手法について、解説していきます。 1.One-Hotエンコーディング OneーHotエンコーディングとは、対象のカテゴリー変数の構成要素分の変数を作成し、作り出した各変数は行単位で「該当なし:0、該当あり:1」を変換する手法です。 メリット カテゴリカル変数のそれぞれの要素に応じて重みづけができ、正確な予測ができる場合がある。 また、One-Hotエンコーディングであれば、後に紹介する「Labelエンコーディング」と異なり、数値の大小関係を各変数に持たせないため、線形回帰やニューラルネットのアルゴリズムでも有効な場合がある。 デメリット カテゴリ数が非常に多いデータセットに対してOne-Hotエンコーディングをすると、ランダムフォレストなどランダムに特徴量を選択するアルゴリズムでは、数値変数などの非カテゴリカル変数の選択率が減少して精度が落ちる場合がある。 ※その場合は、PCAやEmbeddingと組み合わせて次元を削減するにも一つの手と言えるでしょう。 ※生成された各変数間には、線形の依存関係が出てしまいます。これも精度の低下を引き起こす原因となる場合があるため注意が必要です。 train_df['状態1'].value_counts() train_oh = pd.DataFrame() train_oh = pd.get_dummies(train_df['状態1'], prefix='状態1', prefix_sep='_') train_oh.shape train_oh.head(10) 2.ダミーコーディング ダミーコーディングも、(One-Hotエンコーディング同様)カテゴリ変数を数値変換する手法です。One-Hotエンコーディングと異なる点は、生成する特徴量が(One-Hotエンコーディングより)1つ少ない点です。 もう少し詳しく解説していきましょう。 One-Hotエンコーディングの問題点として、生成された各変数間には、線形の依存関係があるという点がありました。 線形の依存関係がある特徴量の場合、線形モデルの回帰係数(重み)が一意に定められないという問題があることに注意が必要です。 というのも、2つの特徴量が強く相関している(線形性がある)場合、特徴量の効果が相加的であり、相関する特徴量のどちらが影響を与えているのか不定となるため、回帰係数(重み)の推定が不安定となります。 k個すべての特徴量を生成してしまった場合、ダミー変数に拘束関係が生まれてしまい、先述の通り回帰分析(のによる重み付け)がうまくできなくなってしまうことが想定されます。 この問題に対処するため、ダミーコーディングでは、生成する特徴量数を「K-1」個、つまり最初の列を削除する(drop_first)方法を取ります。 拘束関係とは、1つの変数が他の変数を使用して簡単に予測できることを意味します。 メリット (One-Hotエンコーディングと比べ)ダミーコーディングの方が多重共線性を回避することができる。 デメリット カテゴリカル変数の要素の数だけカラムが増え、スパースなデータとなり、結果、メモリを消費が大きくなる。 それでは実装と結果を見ていきましょう。 train_df['状態1'].value_counts() train_dm = pd.DataFrame() train_dm = pd.get_dummies(train_df['状態1'], prefix='状態1', prefix_sep='_', drop_first = True) train_dm.shape train_dm.head(10) # prefix – カラムラベルに付け加える文字列 # prefix_sep – prefixとの繋ぎ文字 # drop_first – k-1個のダミー変数を作成するかどうか 3.Effectコーディング 参照カテゴリとは、ダミーコーディングのように参照カテゴリを[0]で表現するのではなく、[-1]で表現する手法です。 メリット 多重共線性を回避するもう一つの方法である。 欠損データにも対応できる。 デメリット 「0〜1」ではなく[-1〜1]で表現するため、使用するメモリ領域が増え、結果的に計算コストが増えてしまう。 train_ef = train_dm.copy() train_ef['状態1_Norm'] = train_ef['状態1_Norm'].where(train_ef['状態1_Feedr'] == 0, -1) # 条件の結果がFalseの値を置換 train_ef[['状態1_Feedr', '状態1_Norm']].head(10) 4.ラベル・エンコーディング ラベル・エンコーディングとは、ラベルの種類に応じた数値を割り当てて特徴量を生成する手法です。 LabelEncoderで1つの変数を離散値として扱う場合は、決定木ベース、ランダムフォレストなどの分析と相性が良いです。 反対に、線形回帰やニューラルネットなど、重み(係数)と特徴量の線形関係のデータが入力値となるアルゴリズムの場合には有効とは言えません。 メリット カラム数(次元数)はそのままで、数値に置き換わるので、メモリを節約できる。 デメリット 欠損値に対応していない、どうなる? ラベル間の数値の大小関係を学んでしまう。その為、発生頻度は低いが重要な変数などを学習しづらい。 エンコーディングの際、無意味な順序付けが発生する。 注意点 順序性がない値、すなわち名義尺度に適用するのは避ける必要があります。思わぬ順序生をモデルに学習させてしまう可能性があります。 その為、各変数の順序性は事前に吟味する必要があるでしょう。より意味のあるデータにする為には、明示的に順位を割り当てる必要もあるでしょう。 また、順位間の差が一定でないものは、Labelエンコーディングにより一定に変換されてしまう為、別の手法を検討する必要があるでしょう。 # LabelEncoder : 3値以上のラベル分類 from sklearn.preprocessing import LabelEncoder train_df['外部の品質'].value_counts() train_lbl = train_df.copy() lbl = LabelEncoder() train_lbl['外部の品質_enc'] = lbl.fit_transform(train_df['外部の品質']) train_lbl[['外部の品質', '外部の品質_enc']].head() 5.Ordinalエンコーディング Ordinalエンコーディングとは、カテゴリ変数の項目数に対応して連番、または指定番号(ラベル)を振った列を作成する手法です。 挙動はLabeleエンコーディングと似ていますが、変換後の値を指定することもできます。実装と結果を見ていきましょう。 メリット 順序性を考慮してエンコーディングができ、Labelエンコーディングの問題を解決できる。 デメメリット Labelエンコーディング同様、順位間の差が一定でないものに対しては、その差を一律にしてしまう為、有効な手法ではない。 map_dict = {'TA': 0, 'Gd': 1, 'Ex': 2, 'Fa': 3} def map_values(x): return map_dict[x] train_oe = train_df.copy() train_oe['外部の品質_oe'] = train_oe['外部の品質'].apply(lambda x: map_values(x)) train_oe[['外部の品質', '外部の品質_oe']].tail() from sklearn.preprocessing import OrdinalEncoder # OrdinalEncoder : 複数項目に対する、一括ラベルエンコーディング train_oe = train_df.copy() oe = OrdinalEncoder() train_oe[['外部の品質_oe', '外部の条件_oe']] = oe.fit_transform(train_df[['外部の品質', '外部の条件']]) # 欠損値不可 train_oe[['外部の品質', '外部の品質_oe', '外部の条件', '外部の条件_oe']].tail() train_df['外部の品質'].value_counts() train_oe['外部の品質_oe'].value_counts() print('='*30) train_df['外部の条件'].value_counts() train_oe['外部の条件_oe'].value_counts() 6.Feature hashing Feature hashingとは、カテゴリ変数に対しハッシュ関数を用いて指定した大きさのハッシュへ変換し、さらにOne-Hotエンコーディングを行う手法です。 メリット 変換前のカテゴリ変数の水準数より、変換後の特徴量の数を少なくすることができる。 ※One-Hotエンコーディングと異なり、列の増加を抑制することができる。 その為、カテゴリの水準数が多く、One-Hotエンコーディングを行うと特徴量が膨大になってしまう場合に利用することが多い手法です。 デメリット One-Hotエンコーディングより次元数を少なくできる反面、衝突が起きる可能性もあります。 衝突とは、水準数を少なくしているため、ハッシュ関数の計算によっては異なる水準でも同じ場所にフラグが立ってしまうことを言います。 また、変換過程にハッシュ値を用いていることから直感的な解釈が難しくなり、データの探索や可視化には不向きと言えます。 from sklearn.feature_extraction import FeatureHasher from sys import getsizeof tgt = train_df['全体的な品質'].astype('str') tgt.value_counts() m = len(tgt.unique()) print(f'要素数(次元数): {m}') # Feature hasing h = FeatureHasher(n_features=m, input_type='string') f = h.fit_transform(tgt) # 疎行列(スパース行列)を、NumPy配列 ndarray(密行列(非スパース行列)) に変換 f.toarray() # 見づらいためDataFrameに変換 df = pd.DataFrame(data = f.todense().astype(str), columns = ["ハッシュ{}".format(x+1) for x in range(m)]) df.head() # 変換前後のデータ容量の確認 print(f'メモリサイズ(前): {getsizeof(tgt)}') print(f'メモリサイズ(後): {getsizeof(f)}') 7.カウント(Frequency)エンコーディング カウント(Frequency)エンコーディングとは、特徴量の各値の出現頻度で置き換える手法です。 また、エンコーディング後の値の大小(出現頻度)に意味があり、そのことが目的変数と相関がある場合は、線形・非線形どちらでも有効な手法と言えます。 メリット 次元を増さずにエンコーディングできる。 デメリット 外れ値も全て拾ってしまう為、外れ値に敏感。ただし、対数変換すれば感度を弱められることがある。 異なるカテゴリカル変数に同じ値が割り当てられ、情報が失われる場合がある。 (例: A,Bの出現回数がそれぞれ100であれば、A→1, B→1と割り当てられてしまう) count_df = train_df.copy() count_map = count_df['全体的な品質'].value_counts().to_dict() count_map count_df['全体的な品質_cnt'] = count_df['全体的な品質'].map(count_map) count_df[['全体的な品質', '全体的な品質_cnt']].head() 8.ラベル・カウント(Ranked frequency)エンコーディング ラベル・カウントエンコーディングとは、特徴量の各値の出現頻度で順位付けし、その順位に置き換える手法です。 メリット データの間隔が全て1になるため、外れ値の影響も小さくなる。 デメリット 出現回数が同じカテゴリカル変数に対して、無意味な順序が導入されてしまう。 rank_df = train_df.copy() rank_map = rank_df['全体的な品質'].value_counts().rank(ascending=False).to_dict() rank_map rank_df['全体的な品質_cnt'] = count_df['全体的な品質'].map(rank_map) rank_df[['全体的な品質', '全体的な品質_cnt']].head() 9.ビンカウンティング ビンカウンティングとは、カテゴリ変数を、カテゴリに関する統計量に置き換える手法です。 目的変数に対して、有用な集計量を設定する必要がある。 カウントエンコーディングも集計値を利用していることから、これも一種のビンカウンティングの手法と言えるでしょう。 またビンカウンティングは、交互作用項を生成する際などにも利用できます。交互作用項とは、関係のある2つの変数をかけ合わせた新しい変数のことを言います。 メリット 大量の特徴量がある場合、統計量の取り方次第で次元数を減らすことができる。 デメメリット 目的変数の値を使用した統計量を学習させたい場合、データ分割(Fold)等を施さなければ、本来予測時に与えられない情報を用いていることになり、不適切な学習となってしまうため注意が必要。 bin_count = train_df.copy() bin_df = pd.DataFrame() taeget = bin_count.groupby('MSサブクラス').agg({'暖炉': 'mean'}) bin_df = pd.merge(bin_count, taeget, on='MSサブクラス', right_index=True)[['MSサブクラス', '暖炉_y']].astype(str) bin_df.head() bin_df.tail() 9.Nonエンコーディング Nonエンコーディングとは、NaN値がある場合、(NaN自体にも情報はあることがあるため)その情報を保持したダミー変数を作る メリット 欠損値に意味のある場合は効果的。 デメメリット 意味のない欠損値に対してエンコーディングした際には、ノイズを多く学習し、その結果精度の低下や、オーバーティングを引き起こす原因ともなる。 # 欠損値の再確認 num = missing_values1['地下室の状態'].T.values[0] print(f'欠損値の数: {num}') # 要素の確認 train_df['地下室の状態'].value_counts() train_non = pd.DataFrame() train_non = pd.get_dummies(train_df['地下室の状態'], prefix='地下室の状態', prefix_sep='_', drop_first = True, dummy_na=True) train_non.shape train_non.head(10) まとめ 以上、様々なエンコーディング手法について解説してきました。 どれもメリット、デメリットがあり、有効な場面は限定される為、各手法の採用根拠を明確にして利用要否を判断していきたいと思います。 参考文献 統計Web 変数の尺度 南山大学 理工学研究センター - 最小二乗法による線形回帰分析 sklearn.feature_extraction.FeatureHasher sklearn.preprocessing.OrdinalEncoder 解析結果 github/house_prices.ipynb データセット:House Prices - Advanced Regression Techniques - kaggle 参考資料
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Anaconda3をPowerShellで使ったときにハマったところメモ

PowerShellにてAnacoda3の仮想環境の切り替えが機能せず、無駄に時間が過ぎたので備忘録。 conda init powershellを実行する が、主要フォルダが日本語化されている場合(e.g. document→ドキュメント)、 エラーでMicrosoft.PowerShell_profile.ps1にAnaconda3の仮想環境の設定が書き込まれず。 そのため、手動で書き込むことに。 ii $PROFILE or code $PROFILEで手動書き込み 自動書き込み内容は以下。しかし利便性を追求したため、後述のカスタマイズを実行 *Microsoft.PowerShell_profile.ps1がない場合: PowerShellでプロファイルを作成する方法を参考にしてください。 Microsoft.PowerShell_profile.ps1 #region conda initialize # !! Contents within this block are managed by 'conda init' !! Write-Output "Type ``ve h` to get help." Write-Output "conda venv (base) enabled." (& "conda.exe" "shell.powershell" "hook") | Out-String | Invoke-Expression #endregions ve(virtual enviroment)という関数名を登録する ハイフン(-)によるオプションを一考後、打つのがめんどいという理由でハイフンなしに決定 Microsoft.PowerShell_profile.ps1 #region conda initialize function ve { if ($Args[0] -eq "d") { Write-Output "conda venv disenabled." return (& "conda.exe" "shell.powershell" "deactivate") | Out-String | Invoke-Expression } if ($Args[0] -eq "h") { return Write-Output @" Anaconda3の仮想環境へ入るためのエイリアス ve : Enter base. 仮想環境(base)に入ります ve <venv name> : Activate. 仮想環境(作成済みの環境)に入ります ve d : Deactivate. 仮想環境から出ます ve h : Get help. このヘルプを表示します "@ } if ($Args[0]) { Write-Output "conda venv ($Args) enabled." return (& "conda.exe" "shell.powershell" "activate" $Args[0]) | Out-String | Invoke-Expression } # !! Contents within this block are managed by 'conda init' !! Write-Output "Type 've h' to get help." Write-Output "conda venv (base) enabled." (& "conda.exe" "shell.powershell" "hook") | Out-String | Invoke-Expression } #endregions
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

M1 MacbookAirのPython環境構築(Miniforge)

最近バッテリーが60%までしか持たなくなった2017年版MacbookAirを卒業し、M1 MacbookAirを購入しました。新しくPythonの環境構築をすることになったので、過程をここにメモしておきます? この解説で使っているパソコン環境 参考にした記事 今回の環境構築方法は、基本的にこれらの記事で紹介してくれている手順を参考にしたものになります。 miniforgeをインストールする ↑のリンクからMiniforgeのリポジトリにアクセスし、下にスクロールしていくとダウンロードするリンクが出てきます。 今回はMacOSの中でもM1(Apple Silicon)なので、「Minoforge3-MacOSX-arm64」をクリックしましょう。 次にダウンロードが完了したらTerminalを開き、下記のコマンドを実行します。 cd ~/Downloads bash Miniforge3-Linux-x86_64.sh 実行するとインストールが始まります。あとは指示に従ってEnterを押したり同意書を読んでyesを入力しましょう。もしここで詰まった場合、こちらの記事のStep2で、どこでEnterを押すのか、何のコマンドを打てば良いのか画像で詳しく解説されているので参考にしてみてください。 Thank you for installing Miniforge3!と表示されればインストール完了です。 インストールが完了したら、一度Terminalを閉じ、再度新しく開きます。 conda --version 再度開いたTerminalで上記コマンドを実行し、conda 4.10.3のようなバージョンが表示されればインストールは成功です。 Pythonをインストール 以下のコマンドを実行します conda create -n python39_env python=3.9 「python39_env」というところは今回作成するPythonの仮想環境の名前なので、各自で好きな名前に変えて大丈夫です。 実行すると、途中でProceed ([y]/n)?と確認されるので、yと打ってEnterを押すと先に進みます。 実行が完了すると以下の文言が表示されます。 # # To activate this environment, use # # $ conda activate python39_env # # To deactivate an active environment, use # # $ conda deactivate ここで表示されている二つのコマンドはPython環境を立ち上げるときや終了するときに必要なコマンドです。(控えておくと便利かもしれません。) 作成したPythonの仮想環境を立ち上げる 先ほど控えたactivateコマンドを実行します。実行すると(python39_env)が左につきます。 仮想環境名を変えていた場合は、「python3_env」の部分を差し替えてください。 conda activate python39_env お疲れ様でした!ここまでくれば、あとは必要なPythonライブラリをインストールしたり、Pythonスクリプトを実行したりと自由にできます。 # pandasのライブラリをインストールする例 conda install pandas Pythonのバージョン確認のコマンドを打つと、指定したバージョンがインストールされていることが確認できます。 python --version # Python 3.9.7 立ち上げていた仮想環境を終了したい場合は、deactivateのコマンドからできます。 conda deactivate
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

PythonとC++が出来る人はRPAのOSSに貢献チャンスあるよ

プロローグ @umireonさんの貢献チャンスの探し方を見てふとヒラメキ。 「そういえば、PythonとC++が出来る人にもチャンスあったな。」 と思ったので、紹介します。 チャンスって? 「Pythonの自動化ライブラリって、C++系のポインタとかの知識が無いと書けなくて、実装が遅れてるんだよね」というチャンスです。 RPAが次第に当たり前のものとなりつつある現代、「退屈なことはPythonにやらせよう」なんてタイトルの本を読んで、業務を自動化しようとしたエンジニアは多いことでしょう。しかし、OSSではまだまだ発展途上の領域であり、「全部入り」みたいな覇権を握るOSSは無い感じです。 「自動化して、その後のメンテナンスも考えて」となると、Pythonに軍配が上がることは多い印象です。スクリプト言語であることと、自動化用ライブラリの多さが決め手です。また、Pythonが使えるエンジニアの総数が多いこともあります。 Pythonなどの言語を使わずに、ローコードの PowerAutomate Desktop などでやった方が早いことも多いです。ただし、バージョン管理などの機能が無いことを覚えおきましょう。 PythonでRPAしてみよう Pythonに限らず、ソースコードを自分で書いてRPAをする場合、主に環境固有の問題が障壁になりやすいです。 モバイル・デスクトップのアプリを操作する場合のことです。 画像認識AIっていいよね それを解決するのが画像認識技術であり、ここから人工知能ブームが RPA のブームを後押しする形だと分かります。これを最大の売りにした SikuliX がかなり便利です。画像認識機能には Hewlett-Packard と Google の下で強化された Tesseract が利用されています。 しかし、Jythonなので外部ライブラリの利用方法が複雑であり、カスタマイズ利用の敷居が高かったりします。おかげで適用できる範囲が狭くなっています。Java + Python + C++ ってかなり高難度ですね。 Xpathもあるよね 「いやいや、画像認識なんて要らないんだ。」 「機械学習するからGPU積めだと?高いんだよ!」 「htmlみたいに構造を辿ればいいじゃないか。」 という流派が、SeleniumとかAppiumです(誤解多め)。もともと、Webのランタイムテスト用のテスト環境統合ツールとしてSeleniumがあります。そこに拡張を施して、iOSアプリやAndroidアプリ、Windowsアプリをデバッグ出来るようにしたものが、Appiumです。 しかし、このAppium、激重いんですね。特にERPとか大きなアプリだと顕著です。Xpathとかカッコいいもの使って、あたかも「UI内を効率よく検索して、目的のUIをハンドリングできる」ように見えます。ところが、ハイスペックPCを所持していないと使い物になりません。RPAプログラムの開発では、実際の業務の流れに沿って操作対象アプリを実際に起動しながら開発するため、遅いと開発効率に直結したりします。モジュール分割により開発速度が改善することは稀でしょう。 それならPythonだけで解決しよう だからといって、Python単体でやろうとすると、PythonとWindowsの架け橋がC++なので、Windows自動化系を書くにはC++の知識が必要だったりします。主にポインタ関連。またまた敷居。 Pythonって自動化ライブラリ無いの? それなら、自動化用のライブラリを探したらいいのでは? あります。 PyAutoGui 画像認識に特化 PyWinAuto UI操作に特化 てことで、Appiumでやれたこと、SikuliXでできたこと、それぞれのライブラリがあります。 そして、この2つのライブラリ、依存関係が複雑で、一緒に使わないほうがいいです。 「統合しちゃえばいいのに」ですね。 統合できないの? 統合というか、PyAutoGUIには近しい予定があります。実現すればPyWinAutoが要らなくなるものです。 PyAutoGUI のロードマップ PyAutoGUI のロードマップを見てみましょう。 Windowsを制御する機能の実装予定 これらが実装予定の Windows API 関連機能です。 pyautogui.getWindows() # returns a dict of window titles mapped to window IDs pyautogui.getWindow(str_title_or_int_id) # returns a “Win” object win.move(x, y) win.resize(width, height) win.maximize() win.minimize() win.restore() win.close() win.position() # returns (x, y) of top-left corner win.moveRel(x=0, y=0) # moves relative to the x, y of top-left corner of the window win.clickRel(x=0, y=0, clicks=1, interval=0.0, button=’left’) # click relative to the x, y of top-left corner of the window Additions to screenshot functionality so that it can capture specific windows instead of full screen. そこで、「Pythonの自動化ライブラリって、C++系のポインタとかの知識が無いと書けなくて、実装が遅れてるんだよね」という話が出てきます。そうなると、「両方できる人って少ないし、しょうがないよね」とかなってきます。 これって実は大チャンスなわけですね。 ドキュメントには、これから実装したい機能のロードマップが掲載されていて、もう何年も達成されていません。主に、Windows API 関連の機能です。ところが、このライブラリの利用者は多く、プルリクやイシューもたくさん来ています。これはチャンス! 分かった貢献しよう!でもどうやって? 簡単な流れはこうです。 追加する機能を選ぶ Forkする 機能の実装に使えそうな Windows API 関数を見つける 対応するPythonのラッパー関数を見つける 実装する DocStringを書く ドキュメントを書く PullRequestする この流れについて、要所の話をしていきましょう。もちろん、Gitの説明は省きます。 Windows API って? Win32 API インデックスポータルから引用すると、 Win32 API (Windows API とも呼ばれる) は、Windowsアプリのネイティブプラットフォームです。 この API は、システム機能とハードウェアに直接アクセスする必要があるデスクトップアプリに最適です。 Windows API は、すべてのデスクトップ アプリで使用できます。同じ関数は、通常、32 ビットと 64 ビットのWindowsアプリケーションでサポートされます。 と書いてあります。 Windowsをネイティブレベルで操れるっぽいことが書いてあります。多くのアプリは、こういったネイティブAPIを介して動作しているわけです。もちろん、開発者でも直接触ることは稀で、ReactNative自体の開発者だったり、Xamarin自体の開発者が扱うような領域です。RPAでも、自作アプリ開発でも、ラッパーを介さずにWindows APIが直接使えるなら、できることの幅が広がるのですね。ただし、ラッパーを使うことで得られる利点とのトレードオフは考慮する必要がありそうです。 ということで今回、PythonとC++のなのでラッパーライブラリを使うことになります。 ラップ前の関数のうち、よく使うものがWindows API インデックスにまとめられています。 具体的には、winuser.h headerとかにある関数が当てはまります。 FindWindowA FindWindowExA FindWindowExW FindWindowW とかです。 Win API をPythonの中でどうやって使う? まず、Windows API はC系なのでctypesが必要になります。次に、WindowなのでWinDllが必要です。 また、ウィンドウを操作する関数が入っているuser32も必要です。 WindowsでPythonをインストールすると、上の3つは標準装備されています。 これにより、例えばC++のFindWindowW関数を、Pythonではctypes.windll.user32.FindWindowW()という感じで使えます。また、C++で言う&numのようなポインターをPythonで使いたい場合はctypes.pointer(num)を使います。 Python ライブラリの作成方法 Windows API の使い方はこれで分かりました。次にPythonのライブラリの作り方です。(場所だけ) Documentation » Python チュートリアル » 6. モジュール そして、Pythonでライブラリの機能を追加したら、ドキュメントも書かなくてはなりません。そこで、 sphinx mkdocs などの静的サイトジェネレータでドキュメントを管理している部分を書き換えましょう。 例えば? 例えば、どんな機能を追加していくのか? サンプルがあったほうが、分かりやすいですよね。 def GetWindowRectFromName(name:str)-> tuple: """ this can return window rect from window title """ hwnd = ctypes.windll.user32.FindWindowW(0, name) rect = ctypes.wintypes.RECT() ctypes.windll.user32.GetWindowRect(hwnd, ctypes.pointer(rect)) # print(hwnd) # print(rect) return (rect.left, rect.top, rect.right, rect.bottom) if __name__ == "__main__": print(GetWindowRectFromName('CALC')) pass このGetWindowRectFromName関数は、ウィンドウタイトルから、ウィンドウの位置を取ってくれる機能があります。 このように、Forkしたら関数を作ってみて、出来あがったらPullRequestするわけです。 まとめ C++とPythonが大好きな皆さん。 または、この記事を読んで、「複数の言語の組み合わさる領域ってチャンスかも?!」と思った、そこのあなた! そのスキルでOSSに貢献してみてはいかがでしょうか。 Excelsior!
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Azure の使用料金を Azure Blob に 保存してみました

概要 このプログラム(Python で Azure の リソースグループ単位で日々の使用料金を取得し、ローカルファイルに保存してみました) を Azure Functions に登録して自動実行させ、Azure Blob にデータアップロード する仕組みに改変しました。Blob に保存されたデータは、Azure PaaSを使用したデータ分析の1つの元データになればと、、、、と考えてます。 実行環境 macOS Big Sur 11.1 python 3.8.3 Azure CLI 2.28.0 Azure Functions Core Tools 3.0.3785 Azure Function Runtime Version: 3.2.0.0 事前の準備 ストレージアカウントの登録 ## 使用するテナントのAzure環境へのログイン $ az login --tenant <tenant_id> ## 使用サブスクリプションを定義します $ az account set --subscription '<Subscription名>' ## リソースグループの作成 $ az group create --resource-group <ResourceGroup名> --location japaneast ## ストレージアカウントの作成 $ az storage account create --name <StorageAccount名> --resource-group <ResourceGroup名> --location japaneast --sku Standard_LRS ## コンテナの作成 $ az storage container create --name usage-summary-data 実行するプログラム用の ServicePrincipal の作成 この記事 にありますように、ServicePrincipal のクライアントシークレットの有効期限はデフォルトで1年、最大2年までの設定となります。そこで Azure CLI から以下のような手順で、有効期限 29年 での ServicePrincial を作成します(現時点では問題なく作成できました)。 ## 最初は2年で作成します(ロール割当なしで) $ az ad sp create-for-rbac --name sp_usagecostmanage --skip-assignment --years 2 ## 作成された内容を取得できることを確認します(いきなり29年で作成した場合、取得できません、、、) $ az ad sp list --display-name sp_usagecostmanage ## その後、29年で再作成します(ロール割当なしで) $ az ad sp create-for-rbac --name sp_usagecostmanage --skip-assignment --years 29 { "appId": "xxxxxxxx-xxxx-5757-4774-xxxxxxxxxxxx", --> 関数アプリの構成で AZURE_CLIENT_ID として登録 "displayName": "sp_usagecostmanage", "name": "xxxxxxxx-xxxx-5757-4774-xxxxxxxxxxxx", "password": "hogehogehogehogehogehogehogehogege", --> 関数アプリの構成で AZURE_CLIENT_SECRET として登録 "tenant": "zzzzzzzz-cccc-4645-5757-zzzzzzzzzzzz" --> 関数アプリの構成で AZURE_TENANT_ID として登録 } ## 必要なスコープに必要なロールを割り与えます $ APP_ID=$(az ad sp list --display-name sp_usagecostmanage --query '[].{ID:appId}' --output tsv) ### データを保存のために、スコープ:containers ロール:Contributor とします $ az role assignment create \ --assignee $APP_ID \ --role "Storage Blob Data Contributor" \ --scope /subscriptions/<Subscription-ID>/resourceGroups/<ResouceGroup名>/providers/Microsoft.Storage/storageAccounts/<StorageAccount名>/blobServices/default/containers/usage-summary-data ### 複数Subscriptionの利用料金取得のため、スコープ:Subscription ロール:Reader とします $ az role assignment create --assignee $APP_ID --scope /subscriptions/<xxx-SubscriptionID> --role Reader $ az role assignment create --assignee $APP_ID --scope /subscriptions/<yyy-SubscriptionID> --role Reader $ az role assignment create --assignee $APP_ID --scope /subscriptions/<zzz-SubscriptionID> --role Reader Functions(関数アプリ)の作成 ## Functionアプリの作成 az functionapp create -g <ResourceGroup名> -n func-CostToBlob -s <StorageAccount名> --runtime python --runtime-version 3.7 --consumption-plan-location japaneast --os-type Linux --functions-version 2 ## 作成した関数アプリの構成設定 ### 関数アプリ実行のための ServicePrincipal 情報の定義 az functionapp config appsettings set -n func-CostToBlob -g <ResourceGroup名> \ --settings "AZURE_TENANT_ID=zzzzzzzz-cccc-4645-5757-zzzzzzzzzzzz" az functionapp config appsettings set -n func-CostToBlob -g <ResourceGroup名> \ --settings "AZURE_CLIENT_ID=xxxxxxxx-xxxx-5757-4774-xxxxxxxxxxxx" az functionapp config appsettings set -n func-CostToBlob -g <ResourceGroup名> \ --settings "AZURE_CLIENT_SECRET=hogehogehogehogehogehogehogehogege" ### 定義情報の確認 az functionapp config appsettings list -n func-CostToBlob -g <ResourceGroup名> -o table Functionsのためのローカル環境の作成 ## Functionのプロジェクトディレクトリの作成 (base)$ mkdir Functions (base)$ cd Functions ## プロジェクト用のPython仮想環境の構築 (base)$ python -m venv .venv (base)$ source .venv/bin/activate (.venv) (base)$ python --version Python 3.8.3 ## Functionのプロジェクトの作成 (.venv) (base)$ func init CostToBlob --python Found Python version 3.8.3 (python3). Writing requirements.txt Writing .funcignore Writing getting_started.md Writing .gitignore Writing host.json Writing local.settings.json Writing /Users/ituru/MyDevelops/AzureCostManagement/Functions/CostToBlob/.vscode/extensions.json ## Functionの作成 (.venv) (base)$ cd CostToBlob (.venv) (base)$ func new Select a number for template: 1. Azure Blob Storage trigger 2. Azure Cosmos DB trigger 3. Durable Functions activity 4. Durable Functions entity 5. Durable Functions HTTP starter 6. Durable Functions orchestrator 7. Azure Event Grid trigger 8. Azure Event Hub trigger 9. HTTP trigger 10. Kafka output 11. Kafka trigger 12. Azure Queue Storage trigger 13. RabbitMQ trigger 14. Azure Service Bus Queue trigger 15. Azure Service Bus Topic trigger 16. Timer trigger Choose option: 16 Timer trigger Function name: [TimerTrigger] CostToBlobMontly Writing /Users/ituru/MyDevelops/AzureCostManagement/Functions/CostToBlob/CostToBlobMontly/readme.md Writing /Users/ituru/MyDevelops/AzureCostManagement/Functions/CostToBlob/CostToBlobMontly/__init__.py Writing /Users/ituru/MyDevelops/AzureCostManagement/Functions/CostToBlob/CostToBlobMontly/function.json The function "CostToBlobMontly" was created successfully from the "Timer trigger" template. ## ディレクトリ構成の確認 (.venv) (base)$ tree -a . ├── .funcignore ├── .gitignore ├── .vscode │   └── extensions.json ├── CostToBlobMontly │   ├── __init__.py │   ├── function.json │   └── readme.md ├── getting_started.md ├── host.json ├── local.settings.json └── requirements.txt 2 directories, 10 files 実行するPythonプログラムの組込み 実行プログラム ./CostToBlobMontly/__init__.py import logging import time import json import azure.functions as func from azure.identity import DefaultAzureCredential from azure.mgmt.resource import SubscriptionClient from azure.mgmt.costmanagement import CostManagementClient from azure.storage.blob import BlobServiceClient from datetime import datetime, timezone, timedelta import pandas as pd CONTAINER_NAME = 'usage-summary-data' # 接続しているテナントのサブスクリプションを操作するオブジェクトを取得 def GetSubscriptionObject(): subscription_client = SubscriptionClient( credential=DefaultAzureCredential() ) return subscription_client # CostManagement情報 を操作するオブジェクトを取得 def GetCostManagementObject(): costmgmt_client = CostManagementClient( credential=DefaultAzureCredential() ) return costmgmt_client # Blobを操作するオブジェクトを取得 def GetBlobServiceObject(): blob_service_client = BlobServiceClient( account_url="https://usagecostmanage.blob.core.windows.net", credential=DefaultAzureCredential() ) return blob_service_client # Containerを操作するオブジェクトを取得 def GetContainerObject(blob_service_client): container_client = blob_service_client.get_container_client(CONTAINER_NAME) return container_client # 指定した Subscription について CostManagement からコストを取得 def GetCostManagement(costmgmt_client, subs_id): # Query costmanagement(先月の情報を取得する) SCOPE = '/subscriptions/{}'.format(subs_id) costmanagement = costmgmt_client.query.usage( SCOPE, { "type": "Usage", "timeframe": "TheLastMonth", "dataset": { "granularity": "Daily", "aggregation": { "totalCost": { "name": "PreTaxCost", "function": "Sum" } }, "grouping": [ { "type": "Dimension", "name": "ResourceGroup" } ] } } ) return costmanagement # サブスクリプションIDを指定しリソースグループ毎に CostManagement情報を取得 def GetSubscriptionCsotManagement(): # サブスクリプションを操作するオブジェクトの取得 subscription_list = GetSubscriptionObject() # CostManagementを操作するオブジェクトの取得 costmgmt_client = GetCostManagementObject() # Blobを操作するオブジェクトを取得 blob_service_client = GetBlobServiceObject() # Containerを操作するオブジェクトを取得 container_client = GetContainerObject(blob_service_client) # 取得コストの キーと値 row_key = ['UsageCost', 'Date', 'ResourceGroup', 'Currency'] row_value = [] # サブスクリプション毎に CostManagement からコストを取得 for n, subs in enumerate(subscription_list.subscriptions.list()): # 指定のサブスクリプションの CostManagement からコストを取得 logging.info("\n\n ##### サブスクリプション : {} #####".format(subs.display_name)) costmanagement = GetCostManagement(costmgmt_client, subs.subscription_id) # rowsカラムデータを取得し、サブスクリプションのコスト合計値の取得 SubTotalCost = sum(cost[0] for cost in costmanagement.rows) # その合計値が「0」の場合、次のサブスクリプションへ if SubTotalCost == 0 : continue # 取得したコスト情報を日付で昇順ソートし、辞書型に変換する row_value = sorted(costmanagement.rows, key=lambda x:x[1], reverse=False) row_dict = [dict(zip(row_key,item)) for item in row_value] # リソースグループでソートして、とりあえずファイルに保存しておく rows = sorted(row_dict, key=lambda x:x['ResourceGroup'], reverse=False) UsageDataToBlobContainer(rows, subs.display_name, blob_service_client) # break # コンテナ一覧の取得 BlobList(container_client) # クローズ処理 costmgmt_client.close() subscription_list.close() container_client.close() blob_service_client.close() # サブスクリプション毎に CostManagement情報を取得 def UsageDataToBlobContainer(rows, subname, blob_service_client): # データを DataFrame化し、不必要項目(Currency)の削除と必要項目(Subscription)の追加 dfl=pd.DataFrame(rows) dfl.drop("Currency", axis=1, inplace=True) dfl['Subscription'] = subname # json形式への変換 rowl_key = ['UsageCost', 'Date', 'ResourceGroup', 'Subscription'] rowl_dict = [dict(zip(rowl_key,item)) for item in dfl.values] # 保存するファイル名の生成 now = datetime.now() # blob_name = './output/UsageCost_' + subname + '_' + now.strftime('%Y%m%d_%H%M%S') + '.json' blob_name = now.strftime('%Y%m%d_%H%M%S') + '_' + subname + '.json' try: # Create a blob client using the blob_name as the name for the blob blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=blob_name) # Blob Upload blob_client.upload_blob(json.dumps(rowl_dict)) except Exception as ex: logging.info('Exception:') logging.info(ex) # Blob一覧の取得 def BlobList(container_client): logging.info("\n\n === Blob 一覧の表示 ===") try: # List the blobs in the container logging.info("\nListing blobs...") blob_list = container_client.list_blobs() for blob in blob_list: logging.info("\t" + blob.name) except Exception as ex: logging.info('Exception:') logging.info(ex) def main(mytimer: func.TimerRequest) -> None: # 日本時間の取得 JST = timezone(timedelta(hours=+9), 'JST') today = datetime.now(JST) if mytimer.past_due: logging.info('■ ■ ■ The timer is past due! ■ ■ ■') logging.info('■ ■ ■ Python timer trigger function (func-CostToBlob) ran at %s ■ ■ ■', today) start = time.time() GetSubscriptionCsotManagement() generate_time = time.time() - start logging.info("処理時間:{0}".format(generate_time) + " [sec]") 実行に必要なPythonパッケージの定義 requirements.txt azure-functions azure-identity azure-mgmt-resource azure-mgmt-costmanagement azure-storage-blob tabulate requests pandas 実行スケジュール(タイマートリガー)の設定 毎月7日 14:20:00(JST)(毎月7日 5:20:00(UTC)) に実行 /CostToBlobMontly/function.json { "scriptFile": "__init__.py", "bindings": [ { "name": "mytimer", "type": "timerTrigger", "direction": "in", "schedule": "0 20 5 7 * *" } ] } Azure Functions をローカル環境でのテスト プログラムの実行 ## Pythonパッケージのインストール (.venv) (base)$ pip install -r requirements.txt ## Azure Functions 構成情報のローカルに設定 (.venv) (base)$ func azure functionapp fetch-app-settings func-CostToBlob App Settings: Loading FUNCTIONS_WORKER_RUNTIME = ***** Loading FUNCTIONS_EXTENSION_VERSION = ***** Loading AzureWebJobsStorage = ***** Loading WEBSITE_CONTENTAZUREFILECONNECTIONSTRING = ***** Loading WEBSITE_CONTENTSHARE = ***** Loading APPINSIGHTS_INSTRUMENTATIONKEY = ***** Loading AZURE_TENANT_ID = ***** Loading AZURE_CLIENT_ID = ***** Loading AZURE_CLIENT_SECRET = ***** Connection Strings: ## ローカル環境での実行 (.venv) (base)$ source ../.venv/bin/activate (.venv) (base)$ func start --verbose Found Python version 3.8.3 (python3). %%%%%% %%%%%% @ %%%%%% @ @@ %%%%%% @@ @@@ %%%%%%%%%%% @@@ @@ %%%%%%%%%% @@ @@ %%%% @@ @@ %%% @@ @@ %% @@ %% % Azure Functions Core Tools Core Tools Version: 3.0.3785 Commit hash: db6fe71b2f05d09757179d5618a07bba4b28826f (64-bit) Function Runtime Version: 3.2.0.0 : 省略 : [2021-10-02T03:48:00.398Z] Host started (526ms) [2021-10-02T03:48:00.398Z] Job host started [2021-10-02T03:48:04.946Z] Host lock lease acquired by instance ID '000000000000000000000000DDDDDDDD'. : 省略 : Azureへのデプロイ デプロイの実行 ## Pythonパッケージのインストール (.venv) (base)$ func azure functionapp publish func-CostToBlob Uploading 7.67 KB [###############################################################################] Remote build in progress, please wait... : 省略 : Uploading built content /home/site/artifacts/functionappartifact.squashfs for linux consumption function app... Resetting all workers for func-costtoblob.azurewebsites.net Deployment successful. Remote build succeeded! Syncing triggers... Functions in func-CostToBlob: CostToBlobMontly - [timerTrigger] あとは、時間が来たら、日単位でリソースグループ毎にサマリされた Azure利用料金がサブスクリプション毎ファイリングされ、Azure Blob の指定コンテナにアップロードされます。 まとめ いろいろと手こずりましたが、無事 Azure Functions で自作のPythonプログラムを稼働させることができました。Azure Function は手間がかかって、、、、、AWSのサクッと感がほしいです。 参考記事 以下の記事を参考にさせていただきました。感謝申し上げます。 MacにPython3をインストールし環境構築【決定版】
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonの仮想環境の構築についてやったのでメモ

仮想環境とは 仮想環境では、すべてのPythonのパッケージから切り離して、独自のパッケージをインストールできる仮想のファイルを作成してくれます。プロジェクトごとに必要なパッケージを用意できるので、モジュールの管理も簡単。 仮想環境の作成手順 01. 仮想環境を作成するファイルの階層にて、 python -m venv ll_env *(ll_env)は仮想環境の名前なので、任意の名前 02. 仮想環境を有効化 source ll_env/bin/activate *(ll_env)は仮想環境の名前なので、任意の名前 03. 仮想環境の無効化 deactivate
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

pythonで、CPPのヘッダ部分をかき集めてCSVに書く!

こんな感じでパス設定すればOK #! /usr/bin/env python # -*- coding: utf-8 -*- import glob import csv if __name__ == '__main__': filelist=[] filelist.extend(glob.glob('./src/**/*.cpp', recursive=True)) filelist.extend(glob.glob('./src/**/*.c', recursive=True)) filelist.extend(glob.glob('./inc/**/*.hpp', recursive=True)) filelist.extend(glob.glob('./inc/**/*.h', recursive=True)) header_list=[] header_list.append(["src","line","include"]) for filepath in filelist: print(filepath) with open(filepath, encoding='utf-8') as f: for i, line in enumerate(f): if '#include' in line: line = line.replace('\n','') header_list.append([filepath,str(i),line]) with open('./sample_writer.csv', 'w') as f: writer = csv.writer(f) writer.writerows(header_list)
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【学習記録】フラグ管理 (paizaランク D 相当)

目次 問題文 書いたコード 解答例 学んだこと 振り返り 問題文 詳細はコチラ 問題文の要約 一行目に以降の入力件数を、二行目以降に7があればTrue、なければFalseを表示せよ。 入力例2 3 4 11 35 出力例2 NO 書いたコード(その1) main.py count = int(input()) for (a, i) in enumerate(range(count)): b = input().split() if str(7) in b: print("YES") break else: if (a+1) == count: print("NO") 今までは一つ一つYES,NOを表示していくパターンが多かったので、どうやって一度だけprintを表示すればいいのか迷いました。酷いな…。 今まで使ったことなかったbreakを使えたのは収穫です(苦し紛れ…。) 書いたコード(その2) main.py count = int(input()) box = [] for i in range(count): box.append(input()) if str(7) in box: print("YES") else: print("NO") この記事を書いている間に思いつきました。単純に最後に評価すればいいじゃんと思ったのがこちら。 判定する値が一つだけじゃなくて複数になったらsetを使えばよさそう。 参考URL 解答例 main.py n = int(input()) flag = False for i in range(n): a = int(input()) if a == 7: flag = True if flag: print("YES") else: print("NO") 最初の段階でFalseを入れておいて、見つかったらTrueを代入する。 この方法なら条件を7の倍数とかにしても簡単に対応できるので応用が効く。 タイトルがフラグ管理、となっているのにも納得です。 学んだこと フラグの管理 一言 無駄だらけのコードではあったけど、enumerateの使い方には慣れてきた実感。 一つの問題に複数コードが書けたのは多少なりとも引き出しが広がった感じがします。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Pythonによる統計学〜t検定編〜

はじめに 教育心理学系の学部の方から対応の無いt検定をPythonで行いたいという質問をいただきました。 今回紹介する対応の無いt検定は実際の分析でも使用することも多く、統計的仮説検定を実務に活用するというとっかかりとしても勉強しておいて損はないと思います。 また、2標本t検定の中でも等分散性を仮定できない場合に実行するウェルチのt検定を中心に解説しようと思います。(汎用性が高く、例えばRではデフォルトの設定がウェルチのt検定です。) ※最後にでも記載しましたが、統計的仮説検定そのものに対する問題点やp値についての議論はここでは行いません。 主に使用する関数 #  両側検定(SciPyのデフォルト) stats.ttest_ind(data1, data2, equal_var=False) # 片側検定 stats.ttest_ind(data1, data2, equal_var=False, alternative="less") # または、 stats.ttest_ind(data1, data2, equal_var=False, alternative="greater") データを所有しており、すぐにでも実行したいという方向けに先に関数だけ紹介しておきます。 data1, data2には配列を入れてください。 データ生成 サンプルデータを作成します。 group1 = np.array([82, 82, 80, 78, 81, 76, 80]) group2 = np.array([83, 80, 83, 83, 84, 83, 83, 83]) # それぞれのデータの平均値 np.mean(group1) np.mean(group2) 出力 >>> group1 array([82, 82, 80, 78, 81, 76, 80]) >>> group2 array([83, 80, 83, 83, 84, 83, 83, 83]) >>> np.mean(group1) 79.85714285714286 >>> np.mean(group2) 82.75 平均値が79.86と82.75でこの違いに意味があるのかを見ることがt検定の目的です。 また、group1のサンプルサイズは7で、group2のサンプルサイズは8です。サンプルサイズが違うことにより対応がないこともお分かりいただけるかと思います。 (教育分野のお話でしたので、指導法別にグループを分けて、なんらかのテストを行ったことを想定しています。) 対応の無いt検定(両側検定)の実行 ウェルチ(Welch)のt検定を行います。 有意水準をα=0.05と設定し、帰無仮説を「2グループ間の平均値に差がないこと」とします。 こちらの式で検定統計量を計算します。 t_0 = \frac{| \overline{x}_1 - \overline{x}_2 |} {\sqrt{\frac{s_1^2}{n_1} + \frac{s_2^2}{n_2}}} stats.ttest_ind(group1, group2, equal_var=False) 出力 Ttest_indResult(statistic=-3.125418086176548, pvalue=0.012436368672610056) p値を(pvalue)を見ると p < 0.05 なので帰無仮説は棄却されます。 対応の無いt検定(片側検定) 両側検定のみで片側検定のやり方が記載されていない記事が多かったので、片側検定のやり方についても共有しようと思います。 SciPyの古いバージョン(version 1.6.0以前)では、パラメータを変更するだけで実行することができなかったということが関係しているのかもしれません。 対応の無いt検定(片側検定)の実行 有意水準をα=0.05と設定し、帰無仮説を「2グループ間の平均値に差がないこと」とします。 上記両側検定との違いは、片側検定なので対立仮説が「group1はgroup2より平均値が小さい」というようなものになります。 stats.ttest_ind(group1, group2, equal_var= False, alternative="less") 出力 Ttest_indResult(statistic=-3.125418086176548, pvalue=0.006218184336305028) p < 0.05 なので帰無仮説は棄却されます。 最後に 本記事では、対応のない2標本t検定の実行方法について紹介しました。実行だけであれば一行のコードで済むのは便利に感じます。 また、統計的仮説検定そのものに対する問題点やp値についての議論はここでは行わないようにしますのでご了承くださいませ。 この記事は以下の情報を参考にして執筆しました。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

seleniumで遊ぼう! 〜自動ログイン編〜

スクレイピングやブラウザの自動操作に興味が湧いたので、 とりあえずselenium使ってみることにしました! とりあえず、使ってみるにしても何か身の回りの作業をやってもらいたかったので、今回は、大学のオンライン授業サイトに自動でログインするツールを作ってみました! 最初にソースコードを、 loginTool from selenium import webdriver from selenium.webdriver.common.keys import Keys import chromedriver_binary # ブラウザを開く。 driver = webdriver.Chrome() # 授業サイトを開く。 driver.get("ここに自分の大学のURLを貼り付けてね") #ユーザーidの要素を取得 element1 = driver.find_element_by_id("username") #ユーザーidの入力 element1.send_keys("ここにユーザーidを入力してね!") #パスワードの要素を取得 element2 = driver.find_element_by_id("password") #パスワードを入力 element2.send_keys("パスワード入力してね!") #ログインボタンの要素を取得 btn = driver.find_element_by_id("LoginBtn") #ログインボタンをクリック btn.send_keys(Keys.ENTER) とりあえずこんな感じです。 pythonで初めて作ったツールなのでかけた時は嬉しかったですね。 とりあえず初回なのでここで終わります。読んでくれた方ありがとうございました!
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【FF14】出現頻度が低い魚が釣れる時間帯を通知してくれるdiscordbotを作った【釣り】

FF14と釣り ファイナルファンタジーXIV(FF14)とは、株式会社スクウェア・エニックスが開発、運営をしているMMORPGゲームです。 MMORPGとは、普通のゲームとは異なりサーバーに常にゲーム世界が存在し、たくさんのプレイヤーが1つの時間と空間を共有してプレイするオンラインゲームです。 そんなFF14には、漁師という職業があり、FF14の広大な世界で釣りをすることができます。 魚が生息している場所に赴き、適した釣り餌を用意して、いろいろな技を駆使して魚を釣り上げ、図鑑を埋めていく… ちょっとテクニカルなどうぶつの森、みたいな感じです。 各地域には「ヌシ」「オオヌシ」と呼ばれる、特定の条件を満たした場合のみ釣ることができるレアな魚が存在します。 その中には、ゲームの中の天候に出現が依存し、月に数回しか獲得チャンスがない魚が何匹か、存在します。漁師を極めようとする猛者たちは、スケジュールとにらめっこしながら世界を旅しています。 そんな魚の希少な出現タイミングを逃さないために、スケジュールを参照できる先人が作った素敵なwebサービスがいくつかあるのですが、実際にそのタイミングを通知してくれるツールは存在しませんでした。自分がプレイするにあたって、通知が欲しいなと感じたので、今回discordbotを使用して実現してみました。 discordbotを選んだ理由としては、以下になります。 ・discordはゲームプレイヤーに最も親しまれている通話ツールであり、FF14との親和性が高いと感じた ・Pythonで開発がしたかった ・フロント側をdiscordに依存できるため楽に開発できる 構想→コーディング→デプロイ→記事作成まで、作業したのは約5日程度でした。 時間・天候の仕様について FF14の世界、エオルゼアにはエオルゼア時間(以下ET)という概念が存在し、ゲームの中の世界はエオルゼア時間で進んでいきます。 ETでの24時間は地球時間(以下LT)での70分に相当します。 エオルゼア時間(ET) 地球時間(LT) 24時間 70分 8時間 23分20秒 1時間 2分55秒 天候はET8時間ごとに各エリアで別々に変化していきます。 また、ET8時間ごとに朝昼夜の区別があります。 朝:ET 00:00 - 07:59 昼:ET 08:00 - 15:59 夜:ET 16:00 - 23:59 天気予報サイト。ET-LT-天候-朝昼夜の関係性が分かりやすい。 たとえば、レア魚「紅龍」を釣りたい場合。 紅玉海(The Ruby Sea)というエリアで、天候が夜…雷から朝…曇りになった際の朝に釣ることが可能。 参考: 魚が釣れる時刻のリストをつくる 現在のETや各エリアの天候を取得できるパッケージがあるので、これを用いて魚が釣れるLTを算出する 1.天候を計算するための基準時間を求める 現在時刻(ET)をもとに、天候が変化してからどれだけのLTが経ったのかを計算する 現在時刻(LT)からそれを引き算し、現在の天候に変化した時刻を基準時間とする ソースコード schedule_calc.py #天候が変わってからLTでどれだけ経過したか elapsed_LT = int(Decimal(str((Eorzea_time.hour % 8 * 175 + Eorzea_time.minute * 2.917) / 60)).quantize(Decimal('0'), rounding=ROUND_HALF_UP)) #現在の天候に変化した時刻を基準時間とする base_time = datetime.datetime.now(pytz.timezone('Asia/Tokyo')) - datetime.timedelta(minutes=elapsed_LT) #秒以下を0埋め base_time = datetime.datetime(base_time.year, base_time.month, base_time.day, base_time.hour, base_time.minute) 2.朝・昼・夜のリストを作成する 上記パッケージでは朝・昼・夜の情報を取得できなかったため、現在時刻(ET)をもとに、ゲーム内の朝・昼・夜を判定し、リストを作成する ソースコード schedule_calc.py if Eorzea_time.hour < 8: term = 'Morning' term_list = ['Morning', 'Day', 'Night'] elif Eorzea_time.hour < 16: term = 'Day' term_list = ['Day', 'Night', 'Morning'] else: term = 'Night' term_list = ['Night', 'Morning', 'Day'] #朝昼夜のリスト作成 term_list = term_list * int(step / 3) #天候の取得時、0番目は1つ前の天候が格納されるため、それに対応 term_list.insert(0, 'term0') 3.釣れる時刻の計算 天候のリストを取得する 魚が釣れる条件をif節に記載し、現在の天候が始まったLTと、そこからの朝・昼・夜のリストを用いて釣れる時間を算出、リストに格納する ソースコード schedule_calc.py #天候の取得 t = tuple(EorzeaTime.weather_period(step=step)) dates=[] #検索(紅龍)...i分後 if fish == 'The Ruby Dragon': weather = EorzeaWeather.forecast('The Ruby Sea', t, strict=True) for i in range(step): if weather[i-1] == 'Thunder' and weather[i] == 'Clouds' and term_list[i] == 'Morning': d, h, m = calc_minute_after(i, term) date = base_time + datetime.timedelta(days=d,hours=h,minutes=m) dates.append(date) return dates schedule_calc.py def calc_minute_after(i, term): #ET8時間は23分20秒だが、簡略化するために朝23/昼23/夜24で計算する calc_minutes = (i - 1) * 23 + (i - 1) // 3 #計算スタートが昼・夜だった場合1分調整 if term == 'Day' or term == 'Night': calc_minutes += 1 d = calc_minutes // 60 // 24 h = calc_minutes // 60 % 24 m = calc_minutes % 60 return d, h, m discordbotに組み込む 今回実装した機能は3つです。 ①通知機能…メインの機能です。魚が釣れるタイミングの30分前/15分前/5分前に通知してくれます。 60秒に1度ループし、実装されている魚が次に釣れる時刻と現在時刻(LT)の30分前/15分前/5分前を突き合わせ、一致した場合、該当の魚のロールが付与されたユーザー向けにメッセージを送信します。 ソースコード schedule_calc.py # 60秒に一回ループ @tasks.loop(seconds=60) async def loop(): # 現在の時刻 now = datetime.datetime.now(pytz.timezone('Asia/Tokyo')) #秒以下を0埋め now = datetime.datetime(now.year, now.month, now.day, now.hour, now.minute) #テスト用 #now = datetime.datetime(2021, 10, 5, 10, 13) print(now) channel = client.get_channel(CHANNEL_ID_MENTION) async def send_Notice(fishing_date, role_ID, minutes_ago): date_ago = fishing_date - datetime.timedelta(minutes=minutes_ago) #現在時刻がx分前だった場合に通知 if now == date_ago: await channel.send('<@&' + str(role_ID) + '> ' + str(minutes_ago) + '分前\n' + fishing_date.strftime('%Y/%m/%d (%a) %H:%M')) for value in fish_dict.values(): fishing_date = schedule_calc(value['name_EN'])[0] #30分前 await send_Notice(fishing_date, value['role_ID'], 30) #15分前 await send_Notice(fishing_date, value['role_ID'], 15) #5分前 await send_Notice(fishing_date, value['role_ID'], 5) ②通知のON/OFF機能…プリフィックス(!) + 魚の名前で通知を有効化/無効化することができます。 discordのロール機能を用いて、ロールの付与/剥奪を切り替えています。 ソースコード schedule_calc.py # メッセージ受信時に動作する処理 @client.event async def on_message(message): guild = client.get_guild(SERVER_ID) #登録 if message.channel.id == CHANNEL_ID_REGISTER: for v in fish_dict.values(): if message.content == '!' + v['name_JA']: role = guild.get_role(v['role_ID']) if role in message.author.roles: await message.author.remove_roles(role) await message.reply(v['name_JA'] + 'のタイマーをOFFにしました') else: await message.author.add_roles(role) await message.reply(v['name_JA'] + 'のタイマーをONにしました') ③スケジュール確認機能…プリフィックス(!) + 魚の名前で直近のスケジュールを確認することができます。 先述した魚が釣れる時刻のリストをメッセージとして出力しています。 ソースコード schedule_calc.py # メッセージ受信時に動作する処理 @client.event async def on_message(message): guild = client.get_guild(SERVER_ID) #スケジュール if message.channel.id == CHANNEL_ID_SCHEDULE: for v in fish_dict.values(): if message.content == '!' + v['name_JA']: msg = v['name_JA'] + ' スケジュール\n' msg += '```' fishing_dates = schedule_calc(v['name_EN']) for fishing_date in fishing_dates: #TODO Herokuに日本語のロケールがないので、曜日を日本語にするなら独自の関数を実行する必要がある msg += fishing_date.strftime('%Y/%m/%d(%a) %H:%M') + '\n' msg += '```' await message.reply(msg) 今後の課題 使用方法の実装 スケジュールの曜日を日本語に(ロケールの関係でめんどくさい) ヘルプ機能の実装(実際の魚の作り方の簡易的なガイドメッセージを出したい) 出現頻度が低い魚を実装 プリフィックス(!) + 魚の名前以外のメッセージを即時削除する 一定時間経過でメッセージが消えるようにする ソースコード botが稼働しているdiscordサーバー 下記リンクからサーバーにJoinすることができます。 連絡先 なにかあれば下記連絡先までお気軽にお問い合わせください。 6futjs9g9xi2r(at)gmail.com 6futjs9g9xi2r#5579 FF14プレイヤー向け 2021/10/7現在以下の3匹のタイマーが実装されています。 紅龍 イラッド・スカーン サプライズエッグ 今後uptimeが1%以下の魚は順次実装していく予定です。 私自身がヌシ釣りの経験が浅いので、サーバーに参加して意見をいただけると幸いです。よろしくお願いいたします。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

JavaScriptでPythonのrange関数を実装する

はじめに これらと同じことをJavaScriptでやる。 for i in range(10): print(i) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] list(range(10)) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] list(range(1, 11)) # [0, 5, 10, 15, 20, 25] list(range(0, 30, 5)) # [0, 3, 6, 9] list(range(0, 10, 3)) # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9] list(range(0, -10, -1)) # [] list(range(0)) # [] list(range(1, 0)) 仕様 すべて数値型の range(end) と range(start, end[, step]) の引数を受け取る。 渡された引数がない場合はエラーを投げる。 最初に載せたPythonのコードと同じ挙動をする。 実装 範囲などは事前に計算し、Symbol.iterator メソッドを実装したイテラブルなオブジェクトを返す。 /** * @param {number} start * @param {number} end * @param {number} [step = 1] * @returns {{ start: number, end: number, step: number, [Symbol.iterator](): Generator<number, void, unknown> }} */ const range = (start, end, step = 1) => { // range() if (typeof start === 'undefined' && typeof end === 'undefined') { throw new TypeError('range expected at least 1 argument, got 0') } // overload range(end) if (typeof end === 'undefined') { end = start start = 0 } /** * @type {number} */ const max = ( ( step === 0 || (start > end && step > -1) || (start < end && step < 0) ) ? 0 : Math.abs(Math.ceil((end - start) / step)) ) return { start, end, step, *[Symbol.iterator]() { for (let i = 0; i < max; i++) { yield start + step * i } } } } 使い方 以下のように使用する。 for (const i of range(10)) { console.log(i) } // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] const array = [...range(10)] // Array.from(range(10))
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【NLP】医療系タスクに使える自然言語処理モデル一覧

初めに 今回の記事はSIGNATEの医学論文の自動仕分けチャレンジで使用した医学系のモデルをまとめました。 どのモデルもHuggingfaceから使用できるモデルなので簡単にすぐ使用できます。ざっくりとモデルの概要説明のみ記載しているので詳しい仕組みや学習時のパラメータ設定、モデルのベンチマークなどは論文内の情報を参照してください。 前提知識 まずは、前提として医療系モデルの事前学習に使用されるメジャーなデータセットの解説 ・Pubmed  生命科学や生物医学に関する参考文献や要約を提供する検索エンジン。アメリカ国立衛生研究所のアメリカ国立医学図書館(NLM)が情報検索Entrezシステムの一部としてデータベースを運用しています。 ・PMC  アメリカ合衆国の国立衛生研究所 (NIH) 内の国立医学図書館 (NLM) の部署である国立生物工学情報センター (NCBI) が運営する、生物医学・生命科学のオンライン論文アーカイブです。 ・MIMIC-III  MIMIC-IIIは、2001年から2012年の間にベスイスラエルディーコネス医療センターの救命救急ユニットに滞在した4万人以上の患者に関連する匿名化された健康関連データを含む、大規模で無料で利用できるデータベースです。データベースには、人口統計、重要な情報などの情報が含まれています。 次に用語など ・BLURB(Biomedical Language Understanding and Reasoning Benchmark) 簡単に言うと生物医学に特化したベンチマークを図るためのデータセット。今回使用するモデルはこちらのサイトのベンチマークを参考にしました。 ・NAACL 「NAACL」は世界中の研究者によって定期開催される国際会議で、「ACL」「EMNLP」※3と並び、自然言語処理分野(NLP)でもっとも権威ある国際会議のひとつです。 モデル紹介 BioELECTRA(2021): 概要: NAACL2021で発表されたモデルで、pubmed,PMCの全データで事前学習済みのELECTRA。ELECTRAはBERTが事前学習としてMLMで学習を部分をGANのGeneratorとDiscriminatorで学習するように置き換わっているモデルです。 論文:https://aclanthology.org/2021.bionlp-1.16.pdf 事前学習済みモデル:https://huggingface.co/kamalkraj/bioelectra-base-discriminator-pubmed-pmc-lt 図:ERECTRAの事前学習 SapBERT(2021): 概要: SapBERTはNAACL2021で発表された事前学習モデルの改善手法です。これは少し特殊でモデルではなくすでに事前学習済みのモデルをさらに改善するための手法が提案されています。Umlsデータセットをもとに同音異義語や単語の表記ゆれなどを改善することにより精度改善につながったようです。今回のコンペではSapBERTを適用したPubmedBERTがCV、PublicLBともにダントツで優秀でした。(LBが微妙だったため提出できなかったのが悔やまれる) 論文:https://arxiv.org/abs/2010.11784 事前学習済みモデル:https://huggingface.co/cambridgeltl/SapBERT-from-PubMedBERT-fulltext 図:SapBERT適用後の単語分布 Umls BERT (2021): 概要: NAACL2021で発表されたモデルで、BioBERT, Clinical BERTの発展形モデル。論文内既存のモデルたちは固有コーパスによって専門用語を学習することには成功したが、医者が持っているようなドメイン知識は獲得できていないと述べています。そのためUMLSを導入することにより、単語間の関係性も学習することが可能になることで、既存モデルを超える精度を実現しています。 論文:https://arxiv.org/pdf/2010.10391.pdf 事前学習済みモデル:https://github.com/gmichalo/UmlsBERT 図:Umls BERTの事前学習 RoBERTa-base-PM(2020) 概要: EMNLP2020で発表されたモデルで、Pubmed,PMC,MIMIC-IIIなどのデータセットを事前学習済みのRoBERTaモデル。 Base,distil,largeの3パターンと学習データセットごとにそれぞれモデルが用意されています。 論文:https://aclanthology.org/2020.clinicalnlp-1.17.pdf 事前学習済みモデル:https://github.com/facebookresearch/bio-lm
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Layer付きのAWS Lambda functionをServerlessFrameworkでローカル実行するとLayerが参照できない

概要 タイトルの通り、Layerを用いて一部ライブラリを共通化しているAWS Lambda functionを、serverlessでローカル実行しようとした時につまずいたので備忘録として残しておく。 経緯 最近、趣味でAWS Lambda上で動かすpythonプログラムを作成していて、デプロイが楽になるとのことなのでserverlessを導入していた。 その際、functionを複数作成しており、一部ライブラリはLayerを用いて共通化していた。 作成したプログラムの構成は以下。 プロジェクトルートディレクトリ ├─function1 │ └─src │ └─handler.py ├─function2 │ └─src │ └─handler.py ├─layer │ ├─layer1 │ │ └─layer1のpythonライブラリ・ソース群 │ └─layer2 │ └─layer2のpythonライブラリ・ソース群 └─serverless.yml また、serverless.ymlの記述内容は以下。 (本記事に関係ない細かな設定については省略している) serverless.yml layers: layer1: path: layer/layer1 name: ${self:service}-layer1 compatibleRuntimes: - python3.7 allowedAccounts: - "*" layer2: path: layer/layer2 name: ${self:service}-layer2 compatibleRuntimes: - python3.7 allowedAccounts: - "*" functions: function1: handler: function1/src/handler.lambda_handler layers: - { Ref: Layer1LambdaLayer } - { Ref: Layer2LambdaLayer } function2: handler: function2/src/handler.lambda_handler layers: - { Ref: Layer1LambdaLayer } - { Ref: Layer2LambdaLayer } この設定でsls deployコマンドを実行すると、AWSへのデプロイは正常終了し、Lambda上でのpythonプログラムの起動もできるのだが、 何故かsls invoke local --function {function名}コマンドによるローカル実行は下記エラーが発生してうまくいっていなかった。 ModuleNotFoundError: No module named '{layerに配置したライブラリ名}' どうやら、ローカル実行するとlayerに配置したライブラリが参照できていないらしい。 AWSにデプロイするとちゃんと動くのに。解せぬ。 解決策 とにかくserverlessでローカル実行するとlayerをうまく参照できていないことがわかったので、コマンド実行時にオプションなどで設定を追加できないか公式ドキュメントを調べてみた。 …うーん、それっぽいオプションは見当たらない。 というかデータ入力の方法についての記述が大半でLayerについての説明なんかは皆無なんだなこれが。 とりあえずオプションを一通り試してみるかー、と半ばやけくそになりかけた時、気になる記述を見つけた。 --docker Enable docker support for NodeJS/Python/Ruby/Java. Enabled by default for other runtimes. 訳)--dockerオプションは、NodeJS / Python / Ruby / JavaのDockerサポートを有効にします。他のランタイムではデフォルトで有効になっています。 へー。pythonだとDockerはデフォルトで無効化されてるんだー。へー。 …ひょっとしてこれじゃね? serverlessではなくAWS CLIを使ってLambda functionをローカルで実行するときは、ローカルのDocker上にLambdaの実行環境が作成されそこでfunctionが実行されることは知っていたのでそこから連想できた。 とにもかくにも、--dockerオプションを追加してコマンドをたたいてみる。 >sls invoke local --function function1 --docker Serverless: Running "serverless" installed locally (in service node_modules) (略) {"statusCode":200,"body":"{\"message\": \"finished . !\"}"} 動いた!!! どうやらlayerが参照できなかったのは、docker上で動かしてなかったからだった模様。 作成したfunctionを正しく動かすためにはほぼ本番に近いdocker上の環境で動かす必要があるのに、その設定はデフォルトで無効になっているという…。 いやまあ毎回docker上で動かしていると実行が遅くなったりするし、Lambda functionはLayer使わないものが大半だったりするとデフォルト無効の意義も理解はできるんだが…。 ちょいと公式ドキュメントが分かりづらすぎやしませんかね? 結論 Layerでライブラリを共通化したLambda functionがserverlessでローカル実行できない、といった事象に遭遇した方はコマンドに--dockerオプションを付け加えてみてはいかがだろうか。
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

ABC220 A~D問題 ものすごく丁寧でわかりやすい解説 python 灰色~茶色コーダー向け #AtCoder

ABC220(AtCoder Beginner Contest 220) A~D問題の解説記事です。 灰色~茶色コーダーの方向けに解説しています。 その他のABC解説、動画などは以下です。 A - Find Multiple 今回のA問題は「やや難」です。 普段のA問題はもっと簡単なので、今回A問題が解けなかった人も落ち込まずに競技プログラミングを続けてください。 公式解説の方法でも解けますが思いつくのが難しいです。 A~Bまでの数について順に「Cの倍数か」=「Cで割った余りが0か」を確認すればOKです。 A~Bまで確認するときは以下のように書きます。 for x in range(A,B+1): 上記のように書くことでx=A~Bまで順に処理ができます。 range(A,B)ではなく、range(A,B+1)であることに注意してください。 xをCで割った余りは x%C で確認できます。 これが0かどうか、if文で判定します。 x%C=0ならばxを出力して終了します。 プログラムを途中で終了するときは exit() と書きます。 入力の受け取り、出力がわからない方は以下の記事を参考にしてください。 【提出】 # 入力の受け取り A,B,C=map(int, input().split()) # x=A~Bまで for x in range(A,B+1): # xがCの倍数=xをCで割った余りが0の場合 if x%C==0: # xを出力 print(x) # プログラムを終了 exit() # A~BまででCの倍数がなければここに到達する # -1を出力 print(-1) B - Base K K進数で書かれた数を10進数へ変換するには 右からi桁目×Kのi乗 を計算して足し算します。 例として3進法の「201」を10進数へ変換しましょう。 右から0桁目=1:1×3^0=1×1=1 右から1桁目=0:0×3^1=0×3=0 右から2桁目=2:2×3^2=2×9=18 よって1+0+18=19となります。 なぜこのように計算できるかは「3進数の定義より」としか言いようがないのですが、詳しく知りたい人は以下のページが比較的わかりやすいかと思います。 理屈がしっかりわかっていなくてもこの問題を解くことはできますので、興味がある人だけ読んでください。 実装についてはK進数→10進数へ変換する関数を作ると楽です。 関数の内容は以下です。 (1)x(文字列)を受け取る (2)右端から各桁の数を確認するため、反転する  (x[::-1]と書くことで反転できます) (3)xの右から順にi桁目×(k^i)を計算して足す (4)結果を返す 関数ができたらA,Bを10進数へ変換して掛け算するだけです。 【提出】 # 入力の受け取り K=int(input()) # A,Bは文字列として受け取り A,B=map(str, input().split()) # 10進法へ変換する関数 def convert_ten(x): # xを反転 x=x[::-1] # 結果の格納用 result=0 # i=0~(xの桁数) for i in range(len(x)): # xのi桁目*(K^i) intでxのi桁目を整数へ変換 result+=int(x[i])*(K**i) # 結果を返す return result # A,Bを10進法へ変換 A_ten=convert_ten(A) B_ten=convert_ten(B) # 答えの出力 print(A_ten*B_ten) C - Long Sequence まず「Aの塊が最低いくつ必要か」考えましょう。 これは「X÷(Aの合計)の商」で計算できます。 さらに(「X÷(Aの合計)の商」)個Aを足した合計は(Aの合計)×「X÷(Aの合計)の商」となることがわかります。 Aの中身はN個なので、BのN×(「X÷(Aの合計)の商」)項目までの合計が(Aの合計)×「X÷(Aの合計)の商」となることがわかるわけです。 あとはXを超えるまでAの値を順に足していけばよいです。 具体例で考えましょう。 「例」 N:3 A:1 2 3 X:20 Aの合計は1+2+3=6です 「X÷(Aの合計)の商」=20÷6=3となりますから、まずAの塊が3つ必要であることがわかります。 A3つの合計は(1+2+3)+(1+2+3)+(1+2+3)=18です。 ここまでの項数はN×「X÷(Aの合計)の商」=3×3=9です。つまりBの9項目までの和が18です。 あとはXを超えるまでAの要素を順に足し算していくだけです。 Bの10項目(+A[0]):18+1=19 Bの11項目(+A[1]):19+2=21 11項目でX(=20)を超えたので、答えは11となります。 【提出】 # 入力の受け取り N=int(input()) A=list(map(int, input().split())) X=int(input()) # Aの合計 A_sum=sum(A) # XをA_sumで割った商 syou=X//A_sum # k:N×商 k=N*syou # B_sum:Aの合計×商 B_sum=A_sum*syou # i=0~N-1まで for i in range(N): # B_sumにA[i]を足す B_sum+=A[i] # kにプラス1 k+=1 # X<B_sumになったら if X<B_sum: # 答えを出力 print(k) # 終了 exit() D - FG operation それぞれの操作が終わった時、左端の数として0~9がいくつありうるか管理します。 具体例で説明します。 N:5 A:0 1 2 3 4 まず左端にある数の個数を管理する配列を作ります。 count=[0,0,0,0,0,0,0,0,0,0] 最初は左端に0がひとつあります。ゆえにcount[0]は1です。 count=[1,0,0,0,0,0,0,0,0,0] 左端からの2つ、x=A[0]=0とy=A[1]=1でそれぞれ足し算、掛け算を行います。 (x+y)%10=(0+1)%10=1 (x×y)%10=(0×1)%10=0 これでA[0]とA[1]に関する操作の終了時点で左端にありうるのは1が1つ、0が1つであることがわかります。これをまたcountに記録します。 count=[1,1,0,0,0,0,0,0,0,0] 次にy=A[2]=2としてまた操作を行います。 xは左端の数なので、左端にありうる数の数をcountから確認します。xとしてありうるのは 0が1個 1が1個 です。 x=0の場合 (x+y)%10=(0+2)%10=2 (x×y)%10=(0×2)%10=0 x=1の場合 (x+y)%10=(1+2)%10=3 (x×y)%10=(1×2)%10=2 ゆえにA[2]までの操作終了時点で左端にありうるのは 0が1個 2が2個 3が1個 です。countに記録します。 count=[1,0,2,1,0,0,0,0,0,0] y=A[3]=3としてまた操作を行います。 例によってx、つまり左端にありうる数の数をcountから確認します。 xとしてありうるのは 0が1個 2が2個 3が1個 です。 x=0の場合 (x+y)%10=(0+3)%10=3 (x×y)%10=(0×3)%10=0 x=2の場合 (x+y)%10=(2+3)%10=5 (x×y)%10=(2×3)%10=6 x=3の場合 (x+y)%10=(3+3)%10=6 (x×y)%10=(3×3)%10=9 気をつけるべきなのはx=2のパターンが「2個」存在することです。 x=2の場合からは5,6が出ていますがこれは2個分として数えます。 ゆえにA[3]までの操作終了時点で左端にありうるのは 0が1個 3が1個 5が2個(1×2) 6が3個(1×2+1) 9が1個 となります。 またcountに記録します。 count=[1,0,0,1,0,2,3,0,0,1] このように左端にありうる数を数えていけば最終的にcountが答えになります。 実装ではx=0~9とy=A[i]の計算結果を確認し、count[x]の個数だけ左端にありうる数として数えるという操作を繰り返し行えば良いです。 【提出】 # 入力の受け取り N=int(input()) A=list(map(int, input().split())) # 余りの定義 mod=998244353 # 左端に0~9がいくつあるかカウントする配列 count=[0]*10 # A[0]が1個ある count[A[0]]=1 # i=1~N-1まで for i in range(1,N): # 新しいカウント用配列 new_count=[0]*10 # y=0~9まで for x in range(10): y=A[i] # x+yの結果 plus=(x+y)%10 # x*yの結果 times=(x*y)%10 # count[x]の個数だけ新しいカウントに追加 new_count[plus]+=count[x] new_count[times]+=count[x] # 余りを取る new_count[plus]%=mod new_count[times]%=mod # countに更新 count=new_count # 答えの出力 for x in count: print(x) 【広告】 「AtCoder 凡人が『緑』になるための精選50問詳細解説」 AtCoderで緑になるための典型50問をひくほど丁寧に解説した本(kindle)、pdf(booth)を販売しています。 値段:100円(Kindle Unlimited対象) kindle:https://www.amazon.co.jp/gp/product/B09C3TPQYV/ booth(pdf):https://sano192.booth.pm/items/3179185 サンプルは以下から。 https://qiita.com/sano192/items/eb2c9cbee6ec4dc79aaf
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む