OpenAI Retro Contestの環境でリプレイ映像を見る

OpenAI Retro Contestの環境構築そのものは既にまとめてくれている方がいて、大変わかりやすかった。この通りにやったら簡単にGym Retro Integrationを動かすことができた。ありがとうございます。

OpenAI Retro Contestの「Gym Retro Integration」でソニック・ザ・ヘッジホッグをプレイする - おおかみ山

ここで作った環境でagentのアルゴリズムを書いていろいろ試すわけだが、その前に動かした結果を確認する方法が必要なので、リプレイを確認する方法をメモしておく。

まずcontestの公式イントロダクションはこちら

https://contest.openai.com/details

こちらに則ってretro_contest.localというwrapperを使ってひとまず動かしてみる。 以下でretro-contestを入れる。

git clone --recursive https://github.com/openai/retro-contest.git
pip install -e "retro-contest/support[docker,rest]"

常に右に動き続けるagentで記録を取る(ランダムだとその場からなかなか進まなくてゲームが終わらないので)

# sonic_simple.py

from retro_contest.local import make

def main():
    env = make(game='SonicTheHedgehog-Genesis', state='LabyrinthZone.Act1', bk2dir='./data')
    obs = env.reset()
    while True:
        action = env.action_space.sample()
        action[7] = 1
        obs, rew, done, info = env.step(action)
        env.render()
        if done:
            break

    env.close()

if __name__ == '__main__':
    main()

makeの第3引数でbk2dirを設定することで、実行時に指定したディレクトリ内に.bk2というバイナリファイルが作成される。

$ mkdir data
$ python sonic_simple.py
$ ls data
SonicTheHedgehog-Genesis-LabyrinthZone-000000.bk2

openai/retroのreadmeにこれをmp4にする方法が書かれている

GitHub - openai/retro: Retro Games in Gym

mp4に変換したいなら以下のスクリプトを使えばOK。

retro/playback_movie.py at master · openai/retro · GitHub

$ git clone https://github.com/openai/retro.git
$ python retro/scripts/playback_movie.py data/SonicTheHedgehog-Genesis-LabyrinthZone-000000.bk2
...いろいろ出力...
$ ls data
SonicTheHedgehog-Genesis-LabyrinthZone-000000.bk2 SonicTheHedgehog-Genesis-LabyrinthZone-000000.mp4

mp4にする必要なくリプレイを見たいだけの場合は以下のスクリプトを実行すれば、学習時と同じ形のウィンドウでリプレイが見られる(再生速度も速い)

# playback.py

import argparse
import sys
import retro

def main(argv=sys.argv[1:]):
    parser = argparse.ArgumentParser(add_help=True)
    parser.add_argument('path', help='path to .bk2 file')
    args = parser.parse_args(argv)

    movie = retro.Movie(args.path)
    movie.step()

    env = retro.make(game=movie.get_game(), use_restricted_actions=retro.ACTIONS_ALL)
    env.initial_state = movie.get_state()
    env.reset()

    while movie.step():
        keys = []
        for i in range(env.NUM_BUTTONS):
            keys.append(movie.get_key(i))
        env.render()
        _obs, _rew, _done, _info = env.step(keys)

if __name__ == '__main__':
    main()

実行

$ python playback.py SonicTheHedgehog-Genesis-LabyrinthZone-000000.bk2

方策勾配法とニューラルネットワークで迷路を学習

DQNで実装したものはネット上でよく見かけるが方策勾配法を使ったものは意外と見つからないのでやってみた。

題材はこちら

第5回 ⽅策勾配法で迷路を攻略|Tech Book Zone Manatee

私はこの連載で強化学習の基本的な実装方法を学んだがとてもわかりやすかった。なので同じ迷路を題材として、↑では離散的に行列で表現されている方策πをニューラルネットワークで置き換えた形にしてみる。

方策勾配法とニューラルネットワークの組み合わせ方としてはこちらを参考にした

深層強化学習:ピクセルから『ポン』 – 前編 | POSTD

3*3の9マスでstart=0(左上)、goal=8(右下)としてstart~goalまで移動させる。状態は迷路内の現在位置で0~7の値(8はgoalなので考えない)、行動は上、右、下、左へ進むことを0~3の値で表す。

先にコードの全体

machine-learning-samples/maze_policy_nn.py at 936b04b6f1ac5ba5c9267d219a92e92cbea4cc31 · y-kamiya/machine-learning-samples · GitHub

ニューラルネットワークのモデル部分はこちら

NUM_HIDDEN_NODES = 32
NUM_STATE = 8
NUM_ACTION = 4
LEARNING_RATE = 0.01

class Net(nn.Module):
    def __init__(self, num_states, num_actions):
        super(Net, self).__init__()
        self.num_states = num_states
        self.num_actions = num_actions

        self.fc1 = nn.Linear(self.num_states, NUM_HIDDEN_NODES)
        self.fc2 = nn.Linear(NUM_HIDDEN_NODES, NUM_HIDDEN_NODES)
        self.fc3 = nn.Linear(NUM_HIDDEN_NODES, self.num_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return F.softmax(self.fc3(x))

class Environment:
    def __init__(self):
        self.model = Net(NUM_STATE, NUM_ACTION)
        self.optimizer = optim.Adam(self.model.parameters(), lr=LEARNING_RATE)
    

全結合層のみで3層。隠れ層のノード数は適当に32にとした。 出力は各行動を取る確率となるようsoftmaxをかけた。ちなみに、softmaxなどを最後にかけてしまうと逆伝播による学習の効率が悪くなる、みたいなことをどこかで読んだ気がするが今回はとりあえず気にしない。

以下に載せるメソッドはすべてEnvironmentのもの

mainから呼ばれる処理

def run(self):
    for episode in range(NUM_EPISODE):
        state = 0

        history = self.run_to_goal()
        self.update_policy(history, episode)

        self.model.eval()
        if episode % 10 == 0:
            self.display_model(episode)

1 episode内での処理は、goalまで動いた結果に基づいてpolicyを更新する、だけ。最後のはただの結果表示で学習には関係ない。それをepisode数分だけ繰り返す。

policyの更新

GAMMA = 0.99

  def update_policy(self, history, episode):
      self.model.train()

      rewards = np.zeros((len(history)))
      targets = np.zeros((len(history), NUM_ACTION))
      for i, entry in enumerate(history):
          rewards[i] = entry[2]
          targets[i] = entry[4]

      discounted_rewards = self.discount_reward(rewards)
      targets = targets * discounted_rewards

      targets.reshape(-1, NUM_ACTION)
      targets = torch.tensor(targets, dtype=torch.float32)

      self.optimizer.zero_grad()
      for i, entry in enumerate(history):
          loss = F.smooth_l1_loss(entry[3], targets[i])
          loss.backward()

      self.optimizer.step()


 def discount_reward(self, rewards):
     discounted_rewards = np.zeros((rewards.size, NUM_ACTION))
     running_add = 0
     for i in range(rewards.size)[::-1]:
         running_add = running_add * GAMMA + rewards[i]
         for j in range(0, NUM_ACTION):
             discounted_rewards[i][j] = running_add

     return discounted_rewards

goalから遠い(startに近い)ほど報酬が小さくなるよう割り引いている。これは参考にしたこちらのページのやり方にならったものだが、今回のようにステップ数が少ない場合はあまり意味がないと思われる。が、とりあえず入れてある。

深層強化学習:ピクセルから『ポン』 – 前編 | POSTD

startからgoalまで動きつつそのhisotryを保存

 def run_to_goal(self):
     state = 0
     history = []

     self.model.eval()

     for step in range(0, NUM_STEPS):
        output = self.model(self.create_input(state))
        props = output.data.numpy()

        action = np.random.choice(range(0, NUM_ACTION), p=props)

        next_state = self.get_next_state(state, action)

        ys = np.zeros(NUM_ACTION)
        ys[action] = 1

        history.append([state, action, 0.0, output, ys])

        if next_state == state:
            break;

        if next_state == GOAL:
            history[-1][2] = 1.0
            break

        state = next_state

     if history[-1][2] == 0.0:
        history[-1][2] = -1.0

     return history


  def create_input(self, state):
      array = np.zeros(NUM_STATE)
      array[state] = 1
      return torch.from_numpy(array).type(torch.FloatTensor)

最大ステップ数はgoalにたどりつくための最小ステップ数である4とした。最後のステップのrewardとして、goalに辿りつけた場合は1.0、辿り着けなかった場合は-1.0を入れる。

create_inputで入力となる状態をone-hotベクトルとして生成しモデルにわたす。出てきた各行動の確率に従って実際に取る行動を決定する。ysは誤差計算の教師データの算出に使う。

state毎に取れるactionは配列で定義し、取れない行動をとった場合は元のマスに留まるようにした。そのような行動はなるべく取らないようにしたいので、同じマスにとどまった場合は即座に終了してreward=-1.0とした。

次のstateを決める部分はこちら

    actions_permitted = [
        [False, True, True, False],
        [False, True, False, True],
        [False, False, True, True],
        [True, True, True, False],
        [False, False, True, True],
        [True, False, False, False],
        [True, False, False, False],
        [True, True, False, False],
    ]

    def get_next_state(self, state, action):
        is_permitted = Environment.actions_permitted[state][action]
        if not is_permitted:
            return state

        if action == 0:
           s_next = state - 3
        elif action == 1:
           s_next = state + 1
        elif action == 2:
           s_next = state + 3
        elif action == 3:
           s_next = state - 1

        if s_next < 0 or NUM_STATE < s_next: 
            return state

        return s_next

5000 episode分学習させた結果がこちら。display_modelによって出力したもので、i番目のtensorがstate iの場合の各行動を取る確率を表している。

tensor([ 0.0440,  0.0387,  0.8735,  0.0438])
tensor([ 0.1760,  0.2273,  0.3344,  0.2623])
tensor([ 0.1618,  0.2812,  0.2996,  0.2574])
tensor([ 0.0127,  0.9614,  0.0133,  0.0126])
tensor([ 0.0033,  0.0041,  0.9905,  0.0021])
tensor([ 0.1505,  0.2378,  0.3876,  0.2241])
tensor([ 0.1632,  0.2882,  0.2861,  0.2624])
tensor([ 0.0005,  0.9991,  0.0003,  0.0001])

goalへ向かう最短距離を通るための行動の確率が高くなっている。また、袋小路になっていて正しいルートでは通らないマスについてはあまり計算されないためそこまで偏りが発生していない。

pytorchでエラー(Leaf variable was used in an inplace operation)

タイトルに書いたエラーが出たのでわかったことをメモ

とりあえず解決に最も有用だった情報はこれ

Leaf variable has been moved into the graph interior - autograd - PyTorch Forums

生成したtensorの要素を直に書き換える処理をした上で、backwardを行うと上記のエラーが出るっぽい。自分のプログラムでもone-hotベクトルの形の入力を作る際に以下のようなことをやっていた。

input = torch.zeros([NUM_STATE], requires_grad=True)
input[state] = 1

これを以下のように書き換えたところエラーは出なくなった

input = np.zeros(NUM_STATE)
input[state] = 1
return torch.from_numpy(input).type(torch.FloatTensor)

調査中にその他調べたこと

エラーが出るのはloss.backward()を呼んだ際で、コード上ではここでエラーが出るようになっている模様。

pytorch/accumulate_grad.cpp at 372d1d67356f054db64bdfb4787871ecdbbcbe0b · pytorch/pytorch · GitHub

以下のページのVariableのところに説明があるが、grad_fnは自分で生成したtensorだとNoneに、計算によって作られたものだとそれを生成したFunctionを参照する値を持つらしい

http://caffe.classcat.com/2017/04/14/pytorch-tutorial-autograd/

また、leaf variableというのは直接生成したtensorのことを指している。

Leaf variable was used in an inplace operation - PyTorch Forums

よって、直接生成したtensorなのにgrad_fnの値がNoneではないためエラーとしているということだった。そしてこれはtensorの要素を書き換えるような処理で発生する。