soccerAIのプログラムレベルのお話(α0.0.2) ①
※これからはプログラムレベルのお話シリーズはsoccerAIでHELIOS倒しちゃうぞ計画として続行いたします。(タイトルは変更しません)
プログラムレベルのお話シリーズ第二回。
今回は前回のlibscipsとは変わって自分が作成・配布しているsoccer AIについて説明しようと思う。
まずまずsoccerAIとは?
soccerAI?なにそれ美味しいの?と思う方もいると思う。なので簡単に説明する。
soccerAIはRobocup大会のシュミレーションリーグに将来的に出場させるために作成されたプログラム
詳しく言うと、この大会は日本(アジア)大会というものがあるが、その中にAIを使っているチームは自分が知っている範囲ではHELIOSのみ。
そして、自分たちjyo_senチームは元秋葉原プログラミング教室の人たちでAIをやっていたからAIをやらなくては教室で学んだことが活かせきれない。
なので、自分はsoccerAIでHELIOS倒しちゃうぞ計画を作成・実行。
その第一歩として前回、libscipsの作成をした。
soccerAI・libscips開発記を見ていない方はこちらから。
soccerAIはこんな感じのものだ。
作ってみる。
本を読む
自分たちは教室から配布されたプログラム等を使い知識は増やしていたが、応用の部分は全くの0。ほぼ何もできないので復習がてらチームメンバーで本を読むことにした。 その本はこれ。
この本はAIの基礎から応用まで幅広く詳しく書かれている本だ。 これをday3まで読んで、応用方法等が少しわかったところで開発を初めた。
※こちらのページは上記の本を読まなくても理解等はできると思いますが、読んだほうがよりわかりやすく理解ができると思います。
実際に作る
今回は、時間はかかるけれども精度が期待できるactor critic法でやっていこうと思う。 まずは、ちょっと前に本のプログラムをわかりやすくしたプログラムを作成したことがあるのでそれをベースにしてやっていこうと思う。
ベースとするプログラム
import time import cuitools import numpy as np import gym from ELAgent import ELAgent from frozen_lake_util import show_q_value from tqdm.gui import tqdm class Actor(ELAgent): def __init__(self, env): super().__init__(epsilon=-1) nrow = env.observation_space.n ncol = env.action_space.n self.actions = list(range(env.action_space.n)) self.Q = np.random.uniform(0, 1, nrow * ncol).reshape((nrow, ncol)) def softmax(self, x): return np.exp(x) / np.sum(np.exp(x), axis=0) def policy(self, s): a = np.random.choice(self.actions, 1, p=self.softmax(self.Q[s])) return a[0] class Critic(): def __init__(self, env): states = env.observation_space.n self.V = np.zeros(states) class ActorCritic(): """このファイルはday3のcode3のActorCriticの実装部分のプログラムを一部改変し、読みやすくし、ファイルを分けたものです。 具体的には? render引数をTrueにすると学習過程がわかるようになりました。 tqdmを使いプログレスバーが表示できるようになりました。 説明が各関数に入っています。 関数の説明 actor_critic.ActorCritic 実行をします。""" def __init__(self, actor_class, critic_class): self.actor_class = actor_class self.critic_class = critic_class def train(self, env, episode_count=1000, gamma=0.9, learning_rate=0.1, render=False, report_interval=50): """actor_critic.train env:環境のデータが格納された変数です。 episode_count:エピソード回数を指定します。default:1000 gamma:割引率を指定します。default:0.9 render:画面に様子を表示するかどうか設定します。default:False report_interval:ログを保存する間隔を設定します。default:50""" actor = self.actor_class(env) critic = self.critic_class(env) actor.init_log() for e in tqdm(range(episode_count)): s = env.reset() done = False while not done: if render: cuitools.reset() env.render() time.sleep(0.01) a = actor.policy(s) n_state, reward, done, info = env.step(a) gain = reward + gamma * critic.V[n_state] estimated = critic.V[s] td = gain - estimated actor.Q[s][a] += learning_rate * td critic.V[s] += learning_rate * td s = n_state else: actor.log(reward) if e != 0 and e % report_interval == 0: pass # actor.show_reward_log(episode=e) return actor, critic def train(): trainer = ActorCritic(Actor, Critic) env = gym.make("FrozenLakeEasy-v0") actor, critic = trainer.train(env, episode_count=100000) show_q_value(actor.Q) actor.show_reward_log() if __name__ == "__main__": train()
独自プログラムに変更
このプログラムはopenAI gymを使っているのでそれを独自のプログラムの部分に変えていく。
そうするとこうなる。
import random import sys import threading import time import numpy as np import env_data def softmax(x): return np.exp(x) / np.sum(np.exp(x), axis=0) def readable_size(size): for unit in ['K', 'M']: if abs(size) < 1024.0: return "%.1f%sB" % (size, unit) size /= 1024.0 size /= 1024.0 return "%.1f%s" % (size, 'GB') class Actor: def __init__(self, env, Q): self.actions = list(range(len(env))) # ballの距離・ballの角度・actionの種類すべてが格納されていて、対応するindexがActor.policyで選ばれる。 if Q is None: self.Q = np.random.uniform(0, 1, ( int((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1), int((env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1), len(env))) else: self.Q = Q print("Q table:" + readable_size(sys.getsizeof(self.Q))) def policy(self, ball_distance, ball_angle): a = np.random.choice(self.actions, 1, p=softmax(self.Q[ball_distance][ball_angle])) return a[0] class Critic: def __init__(self, env, V): # self.Qとほぼ同じ if V is None: self.V = np.zeros(int(((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1) * ( (env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1) * len(env))) else: self.V = V print("V table:" + readable_size(sys.getsizeof(self.V))) class ActorCritic: def __init__(self, actor_class, critic_class): self.actor_class = actor_class self.critic_class = critic_class self.save = 0 def train(self, name, env, data=None,gamma=0.9, learning_rate=0.1, save_interval=500): # Actor Critic法の実装 if data is not None: print(gamma) actor = self.actor_class(env,data["arr_0"]) critic = self.critic_class(env,data["arr_1"]) else: actor = self.actor_class(env, None) critic = self.critic_class(env, None) s = env.reset(name) i = 0 while True: if i % save_interval == 0 and self.save == 0: self.save = 1 np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V) elif i % int(save_interval/2) == 0 and self.save == 1: self.save = 0 a = actor.policy(s[0], s[1]) n_state, reward, end_action = env.step(a) gain = reward + gamma * critic.V[n_state[0] * n_state[1]] estimated = critic.V[s[0] * s[1]] td = gain - estimated actor.Q[s[0]][s[1]][end_action] += learning_rate * td if actor.Q[s[0]][s[1]][end_action] > 100: print("WARNING: Q table exceeds the threshold value.") actor.Q[s[0]][s[1]][end_action] /= 100 critic.V[s[0] * s[1]] += learning_rate * td s = n_state i += 1 def train(): # スレッド化して実行 trainer = ActorCritic(Actor, Critic) if len(sys.argv) == 3: tmp = np.load(sys.argv[1]) for i in range(11): env = env_data.env() threading.Thread(target=trainer.train, args=["AI1", env,tmp]).start() else: for i in range(11): env = env_data.env() threading.Thread(target=trainer.train, args=["AI1", env]).start() time.sleep(1) trainer = ActorCritic(Actor, Critic) if len(sys.argv) == 3: tmp = np.load(sys.argv[2]) for i in range(11): env = env_data.env() threading.Thread(target=trainer.train, args=["AI2", env, tmp]).start() else: for i in range(11): env = env_data.env() threading.Thread(target=trainer.train, args=["AI2", env]).start() env.show_log() if __name__ == "__main__": train()
プログラムの解説
main.py
いろいろなimport
まず、いろいろなimportをする。
import random import sys import threading import time import numpy as np import env_data
用途としては
名前 | 説明 |
---|---|
random | 後述するNaNエラーを回避するための応急処置としてrandomを使っている。次のバージョンではなくなる予定 |
sys | リストのメモリ使用量を調べるのに使用。 |
threading | soccerの選手を1プロセスで24体出すために使用。 |
time | あまり意味はないけれど念の為選手登録のときのsleep用。 |
numpy | AIの計算(配列計算やsoftmax関数等)に使用。 |
env_data | openAI gymに似せたプログラム構造になているAIの環境を解析や報酬関数等が設定されている。後述。 |
という感じ。
関数の定義
def softmax(x): return np.exp(x) / np.sum(np.exp(x), axis=0) def readable_size(size): for unit in ['K', 'M']: if abs(size) < 1024.0: return "%.1f%sB" % (size, unit) size /= 1024.0 size /= 1024.0 return "%.1f%s" % (size, 'GB')
ここで、softmax関数と後述するリストのメモリ使用量の表示をわかりやすくするためのプログラムを定義する。
actor定義
class Actor: def __init__(self, env, Q): self.actions = list(range(len(env))) # ballの距離・ballの角度・actionの種類すべてが格納されていて、対応するindexがActor.policyで選ばれる。 if Q is None: self.Q = np.random.uniform(0, 1, ( int((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1), int((env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1), len(env))) else: self.Q = Q print("Q table:" + readable_size(sys.getsizeof(self.Q)))
actor critic法のactorの部分を定義する。 何をやっているかというと、
self.actionsに後述のenv_data内のクラスが入った引数envの要素数のindexを作成している。
このenv_data内のクラスはlenを使うとAIが実行できるactionの数を返してくれるスグレモノ。
中身は
action数が8この場合
len(env) = 8
self.actions = [0,1,2,3,4,5,6,7]
になる。
引数QがNoneのときはnp.random.uniformでランダムの0〜1の数字のリストを作成する。
このプログラムはAIの学習データの保存、復元ができるようになっている。
QがNone以外のときは学習データがQに入っているのでself.QにQを入れるというプログラムになっている。
ランダムのリストの形式はこんな感じ。
agentとボールの距離を指定した分割量で分割して、ある程度Q tableを大まかにしているindex
agentとボールの角度を指定した分割量で分割して、ある程度Q tableを大まかにしているindex
actionの個数入っているindex
こんなことをやっている。
acotr.policy定義
def policy(self, ball_distance, ball_angle): a = np.random.choice(self.actions, 1, p=softmax(self.Q[ball_distance][ball_angle])) return a[0]
このプログラムはactionからしていのQ tableの確率で選択するシステムになっている。
例えば
agentとボールの距離を指定した分割量で分割した数字が2
agentとボールの角度を指定した分割量で分割した数字が3
の場合は
self.Q[2][3]に書かれている各actionの確率でactionが選択される。
という感じ。
critic定義
class Critic: def __init__(self, env, V): # self.Qとほぼ同じ if V is None: self.V = np.zeros(int(((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1) * ( (env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1) * len(env))) else: self.V = V print("V table:" + readable_size(sys.getsizeof(self.V)))
コメントにあるとおりactorとほぼ同じプログラムなので説明は割愛。
ActorCritic定義
class ActorCritic: def __init__(self, actor_class, critic_class): self.actor_class = actor_class self.critic_class = critic_class self.save = 0
やっていることは actor_classをself.actor_classに入れる
critic_class同様にやる
self.save(学習データを保存しているかどうか監視して2重保存になってファイルが壊れないようにする変数)を0(していない)にする
学習部分の定義
def train(self, name, env, data=None,gamma=0.9, learning_rate=0.1, save_interval=500): # Actor Critic法の実装 if data is not None: print(gamma) actor = self.actor_class(env,data["arr_0"]) critic = self.critic_class(env,data["arr_1"]) else: actor = self.actor_class(env, None) critic = self.critic_class(env, None) s = env.reset(name) i = 0 while True: if i % save_interval == 0 and self.save == 0: self.save = 1 np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V) elif i % int(save_interval/2) == 0 and self.save == 1: self.save = 0 a = actor.policy(s[0], s[1]) n_state, reward, end_action = env.step(a) gain = reward + gamma * critic.V[n_state[0] * n_state[1]] estimated = critic.V[s[0] * s[1]] td = gain - estimated actor.Q[s[0]][s[1]][end_action] += learning_rate * td if actor.Q[s[0]][s[1]][end_action] > 100: print("WARNING: Q table exceeds the threshold value.") actor.Q[s[0]][s[1]][end_action] /= 100 critic.V[s[0] * s[1]] += learning_rate * td s = n_state i += 1
学習部分の定義をしている。
引数
まず引数から。
名前 | 既定値 | 説明 |
---|---|---|
name | - | agentをサッカーサーバーに登録するときの名前を指定 |
env | - | env_data内のクラスが入った変数を指定 |
data | None | 任意。学習済みデータを指定する。 |
gamma | 0.9 | 任意。割引率を指定。 |
learning_rate | 0.1 | 任意。学習する割合を指定。 |
save_interval | 500 | 任意。学習済みデータを保存する間隔をepisodeで指定。 |
詳しい説明は割愛。
学習済みデータの読み込み&actorクラスとcriticクラスを呼び出し
if data is not None: print(gamma) actor = self.actor_class(env,data["arr_0"]) critic = self.critic_class(env,data["arr_1"]) else: actor = self.actor_class(env, None) critic = self.critic_class(env, None)
※print(gamma)
は特に意味はない。デバッグのやつを取り忘れたやつ。
上記の引数の説明の通り、dataがNoneではない場合(学習済みデータを読み込む場合)はactorクラスとcriticクラスの第二引数を学習済みデータにすると、学習済みデータからやってくれるプログラム。詳しくはactor定義を参照。
環境のリセット(サッカーサーバーにagent登録)
s = env.reset(name)
i = 0
agentをサッカーサーバーに登録する。openAI gymで言うenv.reset()
学習プログラム
while True: if i % save_interval == 0 and self.save == 0: self.save = 1 np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V) elif i % int(save_interval/2) == 0 and self.save == 1: self.save = 0 a = actor.policy(s[0], s[1]) n_state, reward, end_action = env.step(a) gain = reward + gamma * critic.V[n_state[0] * n_state[1]] estimated = critic.V[s[0] * s[1]] td = gain - estimated actor.Q[s[0]][s[1]][end_action] += learning_rate * td if actor.Q[s[0]][s[1]][end_action] > 100: print("WARNING: Q table exceeds the threshold value.") actor.Q[s[0]][s[1]][end_action] /= 100 critic.V[s[0] * s[1]] += learning_rate * td s = n_state i += 1
Ctrl+Cでプログラムを停止させるまでは学習し続けるようにしてある。
if i % save_interval == 0 and self.save == 0: self.save = 1 np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V) elif i % int(save_interval/2) == 0 and self.save == 1: self.save = 0
はepisodeが学習済みデータの保存の回数になってかつ他のagentが保存作業をやっていない場合はデータの保存を行うプログラム。
a = actor.policy(s[0], s[1]) n_state, reward, end_action = env.step(a)
このプログラムはagentの情報から行動を決定、env.step
で行動を渡し、実際に行動するプログラム。
gain = reward + gamma * critic.V[n_state[0] * n_state[1]] estimated = critic.V[s[0] * s[1]] td = gain - estimated actor.Q[s[0]][s[1]][end_action] += learning_rate * td if actor.Q[s[0]][s[1]][end_action] > 100: print("WARNING: Q table exceeds the threshold value.") actor.Q[s[0]][s[1]][end_action] /= 100 critic.V[s[0] * s[1]] += learning_rate * td s = n_state i += 1
これは、いろいろな計算*1をして、データの更新を行うプログラム。
if actor.Q[s[0]][s[1]][end_action] > 100: print("WARNING: Q table exceeds the threshold value.") actor.Q[s[0]][s[1]][end_action] /= 100
この部分はQ tableが異常値になってエラーになってしまい止まってしまう現象を止めるための応急処置でしきい値を設定してしきい値を超えたら、しきい値以下の数字に戻すという仕組みになっている。次のアップデートで消去予定。
エラーの内容
Traceback (most recent call last): File "/home/username/.pyenv/versions/3.6.4/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/home/username/.pyenv/versions/3.6.4/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "main.py", line 59, in train a = actor.policy(s[0], s[1]) File "main.py", line 34, in policy p=softmax(self.Q[ball_distance][ball_angle])) File "mtrand.pyx", line 920, in numpy.random.mtrand.RandomState.choice ValueError: probabilities contain NaN
soccerAIのメイン部分はこんな感じ。次はenv_dataを説明する。
長すぎるのでページ分割します。
次はこちらです。
*1:計算の詳細については本を買って読んでね(^^)