- 投稿日:2022-02-24T21:37:35+09:00
DQNを使用したfx取引のコードの説明
本記事について 上記のリポジトリのDQNエージェントのコードの一部を解説します。 forex-tradingのDQNエージェントは以下に沿って行動します。 1 データの読み込み 事前に保存したデータを読み込みデータの9割をトレインデータに1割をテストデータにします。 self.x = np.load("data/x.npy") y = np.load("data/target.npy") self.low = y[:, :, 2].reshape((self.x.shape[0], -1)) self.high = y[:, :, 1].reshape((self.x.shape[0], -1)) self.y = y[:, :, 0].reshape((self.x.shape[0], -1)) self.atr = np.load("data/atr.npy").reshape((self.x.shape[0], -1)).astype(np.int32) self.train_step = np.arange(0, int(self.x.shape[1] * 0.9)) self.test_step = np.arange(self.train_step[-1], int(self.x.shape[1])) 2 Modelの作成 TPUを使う場合ストラテジーを作成し、GPUを使う場合mixed_precisionを使い、CPUの場合何もしない。 if self.use_device == "tpu": try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='') tf.config.experimental_connect_to_cluster(resolver) # This is the TPU initialization code that has to be at the beginning. tf.tpu.experimental.initialize_tpu_system(resolver) self.strategy = tf.distribute.TPUStrategy(resolver) except: self.use_device = "cpu" elif self.use_device == "gpu": from tensorflow.keras.mixed_precision import experimental as mixed_precision policy = mixed_precision.Policy('mixed_float16') mixed_precision.set_policy(policy) フーバー損失関数を作成する def loss_function(self): def loss(q_backup, q): k = 2 error = q_backup - q loss = tf.where(tf.abs(error) <= k, error ** 2 * 0.5, 0.5 * k ** 2 + k * (tf.abs(error) - k)) loss = tf.reduce_mean(loss) return loss return loss モデルを作成し、要約を表示します。 def _build_model(self, lr): loss = self.loss_function() if self.dueling: dqn_network.dueling = True model = dqn_network.network.build_model(self.model_name, self.x.shape[-2:], self.action_size) model.compile( tf.keras.optimizers.Adam(lr, clipnorm=1.), loss=loss, steps_per_execution=100 ) return model def build_model(self, lr=1e-4): if self.use_device == "tpu": with self.strategy.scope(): self.model = self._build_model(lr) self.target_model = tf.keras.models.clone_model(self.model) self.target_model.set_weights(self.model.get_weights()) else: self.model = self._build_model(lr) self.target_model = tf.keras.models.clone_model(self.model) self.target_model.set_weights(self.model.get_weights()) self.model.summary() 3 トレーニングデータを生成する このコードでは探査を不要にするために全てのアクションに対応する報酬を割り振ります。そして、pip_scaleを1以上にすることでオーバーフィットの度合いを制限することができます。 def train_data(self): states, returns, new_states, old_actions = [], [], [], [] h, h_ = 0, self.train_step[-1] n = self.n s = self.s for s in range(self.x.shape[0]): df = self.x[s, h:h_ - n].copy() trend = self.y[s, h:h_] buy = np.array([trend[i + n] - trend[i] for i in range(len(trend) - n)]).reshape((-1,)) scale = np.quantile(abs(buy), 0.99) / 1 buy = np.clip(buy / scale, -1, 1) spread = np.quantile(self.atr[s], 0.25) / scale spread = np.clip(spread, 0.02, None) * self.pip_scale spread = np.round(spread, 2) buy *= self.pip_scale sell = -buy pip = np.zeros((len(trend) - n, self.action_size, self.action_size)) b = 0 if self.action_type == 2 else 1 s = 0 if self.action_type == 1 else 1 pip[:, 0, 0] = buy * b pip[:, 0, 1] = (sell - spread) * s pip[:, 1, 0] = (buy - spread) * b pip[:, 1, 1] = sell * s if self.action_size == 3: pip[:, 2, 0] = buy - spread pip[:, 2, 1] = sell - spread states.append(df[:-self.n]) returns.append(pip[:-self.n]) new_states.append(np.roll(df, -self.n, axis=0)[:-self.n]) concat = np.concatenate self.states, self.returns, self.new_states = \ np.array(states), np.array(returns), np.array(new_states) self.returns = np.round(self.returns, 2).astype(np.float32) 3 トレーニングの実行 ターゲットQ値の計算をし、配列にnanが含まれていないか確認します。 def target_q(self, returns, target_q, target_a): if self.train_loss: target_a = np.argmax(target_a, -1) rr = range(len(returns)) returns[:, 0, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]] returns[:, 0, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]] returns[:, 1, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]] returns[:, 1, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]] if self.action_size == 3: returns[:, 0, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]] returns[:, 1, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]] returns[:, 2, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]] returns[:, 2, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]] returns[:, 2, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]] assert np.mean(np.isnan(returns) == False) == 1 return returns def _train(self, epoch=100, s=0, batch_size=2056): assert isinstance(s, int) ind = self.ind # ランダムなインデックス # データをシャッフルする。 states, new_states, returns = \ self.states[s][ind].copy(), self.new_states[s][ind].copy(), self.returns[s][ind].copy() # 最初のトレーニングではgammaをゼロにする。 if self.train_loss: target_q = self.target_model.predict(new_states, 102800) else: target_q = np.zeros((len(returns), self.action_size, self.action_size), np.float32) for _ in range(epoch): # 報酬値を初期化する returns = self.returns[s][ind].copy() noise = np.random.randn(*states.shape) * 0.1 # ノイズを追加する target_a = self.model.predict(new_states + noise, 102800) returns = self.target_q(returns, target_q, target_a) h = self.model.fit(states + noise, returns, batch_size, validation_split=0.2) self.train_loss.extend(h.history["loss"]) self.val_loss.extend(h.history["val_loss"]) # train_lossの要素数が200以上の時にのみ評価を実行する if len(self.train_loss) >= 200: pips, profits, _, _, _ = self.trade(s, self.test_step[0] - 11513, self.test_step[0], train=True) self.train_rewards.append(np.sum(pips)) pips, profits, _, _, _ = self.trade(s, self.test_step[0], self.test_step[0] + 960 * 8, train=True) self.test_rewards.append(np.sum(pips)) # 取引数を重視したパフォーマンス指標の作成 acc = np.mean(pips > 0) len_pip = len(pips[pips > 0]) * np.clip(acc, 0, 0.75) * 2 total_win = np.sum(pips[pips > 0]) total_lose = np.sum(pips[pips < 0]) ev = \ (np.mean(pips[pips > 0]) * acc + np.mean(pips[pips < 0]) * (1 - acc)) / abs(np.mean(pips[pips < 0])) ev = np.clip(ev, 0, 0.75) / 0.75 rr = np.clip(total_win / abs(total_lose), 0, 2.5) / 2.5 acc /= 0.7 self.max_profit /= self.account_size self.max_pip = (rr + ev + acc) * len_pip self.max_pip = 0 if np.isnan(self.max_pip) else self.max_pip self.test_pip.append(self.max_pip) self.test_profit.append(self.max_profit) if self.max_pips <= self.max_pip: self.best_w = self.model.get_weights() self.max_profits = self.max_profit self.max_profits = np.maximum(self.max_profit, self.max_profits) self.max_pips = np.maximum(self.max_pip, self.max_pips) plt.figure(figsize=(20, 5), dpi=100) plt.subplot(1, 2, 1) plt.plot(self.train_rewards) plt.subplot(1, 2, 2) plt.plot(self.test_rewards) plt.show() print(f"profits = {self.max_profit}, max profits = {self.max_profits}\n" f"pips = {self.max_pip}, max pip = {self.max_pips}") 40回のトレーニングごとにターゲットウエイトをアップデートする。これを15回繰り返す。 def train(self, epoch1=40, epoch2=15, batch_size=2056, save=True, agent_name="dqn", risk=.04): self.risk = risk for _ in range(epoch2): clear_output() plt.figure(figsize=(10, 5)) plt.plot(self.train_loss) plt.plot(self.val_loss) plt.title('Model loss') plt.ylabel('Loss') plt.xlabel('Epoch') plt.legend(['Train', 'Validation'], loc='upper left') plt.show() self._train(epoch1, self.s, batch_size) self.target_model.set_weights(self.model.get_weights())
- 投稿日:2022-02-24T04:44:05+09:00
強化学習を使ってfx取引を学習する
作ったもの fxの過去のデータを利用して強化学習を使って自動取引を学習させること目標にします。詳しいコードは上記のリポジトリを参照してください。 データの準備 Mettrader5をインストールします デモアカウントかリアルアカウントを作成します pythonでMetatrader5パッケージをインストールします pip install Metatrader5 そして以下のコードを実行します。 cd data python gen_data.py 実行 from agent import dqn agent = dqn.Agent(model_name="efficientnet_b0", s=5, action_type=3, pip_scale=1, n=1, loss_cut=False, use_device="tpu", dueling=False) agent.run() agent.plot_result(w=self.best_w, risk=0.04, s=self.s) 結果 これは違うデータによるものです