kumitatepazuru's blog

中学生のメモブログ。みんなの役に立ちたい。

soccerAIのプログラムレベルのお話(α0.0.2) ①

f:id:kumitatepazuru:20200607192843p:plain ※これからはプログラムレベルのお話シリーズはsoccerAIでHELIOS倒しちゃうぞ計画として続行いたします。(タイトルは変更しません)
プログラムレベルのお話シリーズ第二回。 今回は前回のlibscipsとは変わって自分が作成・配布しているsoccer AIについて説明しようと思う。

まずまずsoccerAIとは?

soccerAI?なにそれ美味しいの?と思う方もいると思う。なので簡単に説明する。

soccerAIはRobocup大会のシュミレーションリーグに将来的に出場させるために作成されたプログラム

詳しく言うと、この大会は日本(アジア)大会というものがあるが、その中にAIを使っているチームは自分が知っている範囲ではHELIOSのみ。

そして、自分たちjyo_senチームは元秋葉原プログラミング教室の人たちでAIをやっていたからAIをやらなくては教室で学んだことが活かせきれない。

なので、自分はsoccerAIでHELIOS倒しちゃうぞ計画を作成・実行。

その第一歩として前回、libscipsの作成をした。

soccerAI・libscips開発記を見ていない方はこちらから。

kumitatepazuru.hatenablog.com

soccerAIはこんな感じのものだ。

作ってみる。

本を読む

自分たちは教室から配布されたプログラム等を使い知識は増やしていたが、応用の部分は全くの0。ほぼ何もできないので復習がてらチームメンバーで本を読むことにした。 その本はこれ。

www.amazon.co.jp

この本はAIの基礎から応用まで幅広く詳しく書かれている本だ。 これをday3まで読んで、応用方法等が少しわかったところで開発を初めた。

※こちらのページは上記の本を読まなくても理解等はできると思いますが、読んだほうがよりわかりやすく理解ができると思います。

実際に作る

今回は、時間はかかるけれども精度が期待できるactor critic法でやっていこうと思う。 まずは、ちょっと前に本のプログラムをわかりやすくしたプログラムを作成したことがあるのでそれをベースにしてやっていこうと思う。

github.com

ベースとするプログラム

import time

import cuitools
import numpy as np
import gym
from ELAgent import ELAgent
from frozen_lake_util import show_q_value
from tqdm.gui import tqdm


class Actor(ELAgent):

    def __init__(self, env):
        super().__init__(epsilon=-1)
        nrow = env.observation_space.n
        ncol = env.action_space.n
        self.actions = list(range(env.action_space.n))
        self.Q = np.random.uniform(0, 1, nrow * ncol).reshape((nrow, ncol))

    def softmax(self, x):
        return np.exp(x) / np.sum(np.exp(x), axis=0)

    def policy(self, s):
        a = np.random.choice(self.actions, 1,
                             p=self.softmax(self.Q[s]))
        return a[0]


class Critic():

    def __init__(self, env):
        states = env.observation_space.n
        self.V = np.zeros(states)


class ActorCritic():
    """このファイルはday3のcode3のActorCriticの実装部分のプログラムを一部改変し、読みやすくし、ファイルを分けたものです。

        具体的には?
        render引数をTrueにすると学習過程がわかるようになりました。
        tqdmを使いプログレスバーが表示できるようになりました。
        説明が各関数に入っています。

        関数の説明
        actor_critic.ActorCritic
        実行をします。"""

    def __init__(self, actor_class, critic_class):
        self.actor_class = actor_class
        self.critic_class = critic_class

    def train(self, env, episode_count=1000, gamma=0.9,
              learning_rate=0.1, render=False, report_interval=50):
        """actor_critic.train
                env:環境のデータが格納された変数です。
                episode_count:エピソード回数を指定します。default:1000
                gamma:割引率を指定します。default:0.9
                render:画面に様子を表示するかどうか設定します。default:False
                report_interval:ログを保存する間隔を設定します。default:50"""
        actor = self.actor_class(env)
        critic = self.critic_class(env)

        actor.init_log()
        for e in tqdm(range(episode_count)):
            s = env.reset()
            done = False
            while not done:
                if render:
                    cuitools.reset()
                    env.render()
                    time.sleep(0.01)
                a = actor.policy(s)
                n_state, reward, done, info = env.step(a)

                gain = reward + gamma * critic.V[n_state]
                estimated = critic.V[s]
                td = gain - estimated
                actor.Q[s][a] += learning_rate * td
                critic.V[s] += learning_rate * td
                s = n_state

            else:
                actor.log(reward)

            if e != 0 and e % report_interval == 0:
                pass
                # actor.show_reward_log(episode=e)

        return actor, critic


def train():
    trainer = ActorCritic(Actor, Critic)
    env = gym.make("FrozenLakeEasy-v0")
    actor, critic = trainer.train(env, episode_count=100000)
    show_q_value(actor.Q)
    actor.show_reward_log()


if __name__ == "__main__":
    train()

独自プログラムに変更

このプログラムはopenAI gymを使っているのでそれを独自のプログラムの部分に変えていく。

そうするとこうなる。

import random
import sys
import threading
import time

import numpy as np

import env_data


def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=0)


def readable_size(size):
    for unit in ['K', 'M']:
        if abs(size) < 1024.0:
            return "%.1f%sB" % (size, unit)
        size /= 1024.0
    size /= 1024.0
    return "%.1f%s" % (size, 'GB')


class Actor:

    def __init__(self, env, Q):
        self.actions = list(range(len(env)))
        # ballの距離・ballの角度・actionの種類すべてが格納されていて、対応するindexがActor.policyで選ばれる。
        if Q is None:
            self.Q = np.random.uniform(0, 1, (
                int((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1),
                int((env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1), len(env)))
        else:
            self.Q = Q
        print("Q table:" + readable_size(sys.getsizeof(self.Q)))

    def policy(self, ball_distance, ball_angle):
        a = np.random.choice(self.actions, 1,
                             p=softmax(self.Q[ball_distance][ball_angle]))
        return a[0]


class Critic:

    def __init__(self, env, V):
        # self.Qとほぼ同じ
        if V is None:
            self.V = np.zeros(int(((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1) * (
                    (env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1) * len(env)))
        else:
            self.V = V
        print("V table:" + readable_size(sys.getsizeof(self.V)))


class ActorCritic:
    def __init__(self, actor_class, critic_class):
        self.actor_class = actor_class
        self.critic_class = critic_class
        self.save = 0

    def train(self, name, env, data=None,gamma=0.9, learning_rate=0.1, save_interval=500):
        # Actor Critic法の実装
        if data is not None:
            print(gamma)
            actor = self.actor_class(env,data["arr_0"])
            critic = self.critic_class(env,data["arr_1"])
        else:
            actor = self.actor_class(env, None)
            critic = self.critic_class(env, None)
        s = env.reset(name)
        i = 0
        while True:
            if i % save_interval == 0 and self.save == 0:
                self.save = 1
                np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V)
            elif i % int(save_interval/2) == 0 and self.save == 1:
                self.save = 0
            a = actor.policy(s[0], s[1])
            n_state, reward, end_action = env.step(a)

            gain = reward + gamma * critic.V[n_state[0] * n_state[1]]
            estimated = critic.V[s[0] * s[1]]
            td = gain - estimated
            actor.Q[s[0]][s[1]][end_action] += learning_rate * td
            if actor.Q[s[0]][s[1]][end_action] > 100:
                print("WARNING: Q table exceeds the threshold value.")
                actor.Q[s[0]][s[1]][end_action] /= 100
            critic.V[s[0] * s[1]] += learning_rate * td
            s = n_state
            i += 1


def train():
    # スレッド化して実行
    trainer = ActorCritic(Actor, Critic)
    if len(sys.argv) == 3:
        tmp = np.load(sys.argv[1])
        for i in range(11):
            env = env_data.env()
            threading.Thread(target=trainer.train, args=["AI1", env,tmp]).start()
    else:
        for i in range(11):
            env = env_data.env()
            threading.Thread(target=trainer.train, args=["AI1", env]).start()
    time.sleep(1)
    trainer = ActorCritic(Actor, Critic)
    if len(sys.argv) == 3:
        tmp = np.load(sys.argv[2])
        for i in range(11):
            env = env_data.env()
            threading.Thread(target=trainer.train, args=["AI2", env, tmp]).start()
    else:
        for i in range(11):
            env = env_data.env()
            threading.Thread(target=trainer.train, args=["AI2", env]).start()
    env.show_log()


if __name__ == "__main__":
    train()

プログラムの解説

main.py

いろいろなimport

まず、いろいろなimportをする。

import random
import sys
import threading
import time

import numpy as np

import env_data

用途としては

名前 説明
random 後述するNaNエラーを回避するための応急処置としてrandomを使っている。次のバージョンではなくなる予定
sys リストのメモリ使用量を調べるのに使用。
threading soccerの選手を1プロセスで24体出すために使用。
time あまり意味はないけれど念の為選手登録のときのsleep用。
numpy AIの計算(配列計算やsoftmax関数等)に使用。
env_data openAI gymに似せたプログラム構造になているAIの環境を解析や報酬関数等が設定されている。後述。

という感じ。

関数の定義
def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=0)


def readable_size(size):
    for unit in ['K', 'M']:
        if abs(size) < 1024.0:
            return "%.1f%sB" % (size, unit)
        size /= 1024.0
    size /= 1024.0
    return "%.1f%s" % (size, 'GB')

ここで、softmax関数と後述するリストのメモリ使用量の表示をわかりやすくするためのプログラムを定義する。

actor定義
class Actor:

    def __init__(self, env, Q):
        self.actions = list(range(len(env)))
        # ballの距離・ballの角度・actionの種類すべてが格納されていて、対応するindexがActor.policyで選ばれる。
        if Q is None:
            self.Q = np.random.uniform(0, 1, (
                int((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1),
                int((env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1), len(env)))
        else:
            self.Q = Q
        print("Q table:" + readable_size(sys.getsizeof(self.Q)))

actor critic法のactorの部分を定義する。 何をやっているかというと、

self.actionsに後述のenv_data内のクラスが入った引数envの要素数のindexを作成している。

このenv_data内のクラスはlenを使うとAIが実行できるactionの数を返してくれるスグレモノ。

中身は

action数が8この場合

len(env) = 8

self.actions = [0,1,2,3,4,5,6,7]

になる。

引数QがNoneのときはnp.random.uniformでランダムの0〜1の数字のリストを作成する。

このプログラムはAIの学習データの保存、復元ができるようになっている。

QがNone以外のときは学習データがQに入っているのでself.QにQを入れるというプログラムになっている。

ランダムのリストの形式はこんな感じ。

agentとボールの距離を指定した分割量で分割して、ある程度Q tableを大まかにしているindex

agentとボールの角度を指定した分割量で分割して、ある程度Q tableを大まかにしているindex

actionの個数入っているindex

こんなことをやっている。

acotr.policy定義
def policy(self, ball_distance, ball_angle):
        a = np.random.choice(self.actions, 1,
                             p=softmax(self.Q[ball_distance][ball_angle]))
        return a[0]

このプログラムはactionからしていのQ tableの確率で選択するシステムになっている。

例えば

agentとボールの距離を指定した分割量で分割した数字が2

agentとボールの角度を指定した分割量で分割した数字が3

の場合は

self.Q[2][3]に書かれている各actionの確率でactionが選択される。

という感じ。

critic定義
class Critic:

    def __init__(self, env, V):
        # self.Qとほぼ同じ
        if V is None:
            self.V = np.zeros(int(((env.max_ball_distance - env.min_ball_distance) / env.ball_distance_interval + 1) * (
                    (env.max_ball_angle - env.min_ball_angle) / env.ball_angle_interval + 1) * len(env)))
        else:
            self.V = V
        print("V table:" + readable_size(sys.getsizeof(self.V)))

コメントにあるとおりactorとほぼ同じプログラムなので説明は割愛。

ActorCritic定義
class ActorCritic:
    def __init__(self, actor_class, critic_class):
        self.actor_class = actor_class
        self.critic_class = critic_class
        self.save = 0

やっていることは actor_classをself.actor_classに入れる

critic_class同様にやる

self.save(学習データを保存しているかどうか監視して2重保存になってファイルが壊れないようにする変数)を0(していない)にする

学習部分の定義
def train(self, name, env, data=None,gamma=0.9, learning_rate=0.1, save_interval=500):
        # Actor Critic法の実装
        if data is not None:
            print(gamma)
            actor = self.actor_class(env,data["arr_0"])
            critic = self.critic_class(env,data["arr_1"])
        else:
            actor = self.actor_class(env, None)
            critic = self.critic_class(env, None)
        s = env.reset(name)
        i = 0
        while True:
            if i % save_interval == 0 and self.save == 0:
                self.save = 1
                np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V)
            elif i % int(save_interval/2) == 0 and self.save == 1:
                self.save = 0
            a = actor.policy(s[0], s[1])
            n_state, reward, end_action = env.step(a)

            gain = reward + gamma * critic.V[n_state[0] * n_state[1]]
            estimated = critic.V[s[0] * s[1]]
            td = gain - estimated
            actor.Q[s[0]][s[1]][end_action] += learning_rate * td
            if actor.Q[s[0]][s[1]][end_action] > 100:
                print("WARNING: Q table exceeds the threshold value.")
                actor.Q[s[0]][s[1]][end_action] /= 100
            critic.V[s[0] * s[1]] += learning_rate * td
            s = n_state
            i += 1

学習部分の定義をしている。

引数

まず引数から。

名前 既定値 説明
name - agentをサッカーサーバーに登録するときの名前を指定
env - env_data内のクラスが入った変数を指定
data None 任意。学習済みデータを指定する。
gamma 0.9 任意。割引率を指定。
learning_rate 0.1 任意。学習する割合を指定。
save_interval 500 任意。学習済みデータを保存する間隔をepisodeで指定。

詳しい説明は割愛。

学習済みデータの読み込み&actorクラスとcriticクラスを呼び出し
if data is not None:
            print(gamma)
            actor = self.actor_class(env,data["arr_0"])
            critic = self.critic_class(env,data["arr_1"])
else:
            actor = self.actor_class(env, None)
            critic = self.critic_class(env, None)

print(gamma)は特に意味はない。デバッグのやつを取り忘れたやつ。

上記の引数の説明の通り、dataがNoneではない場合(学習済みデータを読み込む場合)はactorクラスとcriticクラスの第二引数を学習済みデータにすると、学習済みデータからやってくれるプログラム。詳しくはactor定義を参照。

環境のリセット(サッカーサーバーにagent登録)
s = env.reset(name)
i = 0

agentをサッカーサーバーに登録する。openAI gymで言うenv.reset()

学習プログラム
while True:
            if i % save_interval == 0 and self.save == 0:
                self.save = 1
                np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V)
            elif i % int(save_interval/2) == 0 and self.save == 1:
                self.save = 0
            a = actor.policy(s[0], s[1])
            n_state, reward, end_action = env.step(a)

            gain = reward + gamma * critic.V[n_state[0] * n_state[1]]
            estimated = critic.V[s[0] * s[1]]
            td = gain - estimated
            actor.Q[s[0]][s[1]][end_action] += learning_rate * td
            if actor.Q[s[0]][s[1]][end_action] > 100:
                print("WARNING: Q table exceeds the threshold value.")
                actor.Q[s[0]][s[1]][end_action] /= 100
            critic.V[s[0] * s[1]] += learning_rate * td
            s = n_state
            i += 1

Ctrl+Cでプログラムを停止させるまでは学習し続けるようにしてある。

if i % save_interval == 0 and self.save == 0:
                self.save = 1
                np.savez("agent_data_" + name + "_" + str(i), actor.Q, critic.V)
elif i % int(save_interval/2) == 0 and self.save == 1:
                self.save = 0

はepisodeが学習済みデータの保存の回数になってかつ他のagentが保存作業をやっていない場合はデータの保存を行うプログラム。

a = actor.policy(s[0], s[1])
n_state, reward, end_action = env.step(a)

このプログラムはagentの情報から行動を決定、env.stepで行動を渡し、実際に行動するプログラム。

gain = reward + gamma * critic.V[n_state[0] * n_state[1]]
estimated = critic.V[s[0] * s[1]]
td = gain - estimated
actor.Q[s[0]][s[1]][end_action] += learning_rate * td
if actor.Q[s[0]][s[1]][end_action] > 100:
    print("WARNING: Q table exceeds the threshold value.")
    actor.Q[s[0]][s[1]][end_action] /= 100
critic.V[s[0] * s[1]] += learning_rate * td
s = n_state
i += 1

これは、いろいろな計算*1をして、データの更新を行うプログラム。

if actor.Q[s[0]][s[1]][end_action] > 100:
    print("WARNING: Q table exceeds the threshold value.")
    actor.Q[s[0]][s[1]][end_action] /= 100

この部分はQ tableが異常値になってエラーになってしまい止まってしまう現象を止めるための応急処置でしきい値を設定してしきい値を超えたら、しきい値以下の数字に戻すという仕組みになっている。次のアップデートで消去予定。

エラーの内容

Traceback (most recent call last):
  File "/home/username/.pyenv/versions/3.6.4/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/username/.pyenv/versions/3.6.4/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "main.py", line 59, in train
    a = actor.policy(s[0], s[1])
  File "main.py", line 34, in policy
    p=softmax(self.Q[ball_distance][ball_angle]))
  File "mtrand.pyx", line 920, in numpy.random.mtrand.RandomState.choice
ValueError: probabilities contain NaN

soccerAIのメイン部分はこんな感じ。次はenv_dataを説明する。

長すぎるのでページ分割します。

次はこちらです。

kumitatepazuru.hatenablog.com

*1:計算の詳細については本を買って読んでね(^^)