kumitatepazuru's blog

中学生のメモブログ。みんなの役に立ちたい。

soccerAIのプログラムレベルのお話(α0.0.2) ②

この記事は以下の記事の続きです。見ていない方はまずこちらを見てください。

kumitatepazuru.hatenablog.com

解説の続き

env_data.py

env_dataはopenAI gymのプログラムにかなり似せたプログラムになっている。 プログラムソースコードはこちら。(すごい長いです)

import random

import libscips.signal
import itertools
import numpy as np
from matplotlib import pyplot as plt, animation


def logtoplot(data, interval):
    return list(map(lambda n: n.mean(), np.array_split(np.array(data), interval)))


class env(libscips.signal.player_signal):
    def __init__(self, ADDRESS="127.0.0.1", HOST="", send_log=False, recieve_log=False,
                 analysis_log=("unknown", "init", "error","hear"),
                 max_ball_distance=40, min_ball_distance=0, ball_distance_interval=10,
                 max_ball_angle=100, min_ball_angle=-100, ball_angle_interval=40,
                 max_goal_distance=55, min_goal_distance=5, goal_distance_interval=10,
                 max_goal_angle=100, min_goal_angle=-100, goal_angle_interval=40,
                 report_interval=50, noise=0.8):
        # 初期設定
        super().__init__(ADDRESS, HOST, send_log, recieve_log, analysis_log)
        self.actions = ["dash", "kick", "turn1", "turn2", "turn3", "turn4", "turn5", "kick2", "kick3"]
        self.report_interval = report_interval
        self.max_ball_distance = max_ball_distance
        self.min_ball_distance = min_ball_distance
        self.ball_distance_interval = ball_distance_interval
        self.max_ball_angle = max_ball_angle
        self.min_ball_angle = min_ball_angle
        self.ball_angle_interval = ball_angle_interval
        self.max_goal_distance = max_goal_distance
        self.min_goal_distance = min_goal_distance
        self.goal_distance_interval = goal_distance_interval
        self.max_goal_angle = max_goal_angle
        self.min_goal_angle = min_goal_angle
        self.goal_angle_interval = goal_angle_interval
        self.noise = noise
        self.turn_repeat = [0, 0]
        self.team = None
        self.enemy = None
        self.reward_log = [0]
        self.reward_repeat_log = [0]
        self.reward_action_log = [0]
        self.reward_see_log = [0]

    def __len__(self):
        # len(env)をやったときに返すデータ
        return len(self.actions)

    def _update(self, _, _2, _3):
        """グラフを更新するための関数"""

        split_num = int(len(self.reward_log) / self.report_interval) + 1

        # 現在のグラフを消去する
        plt.cla()
        # 折れ線グラフを再描画する
        plt.plot(list(range(split_num)), logtoplot(self.reward_log, split_num), "-o", label="reward")
        plt.plot(list(range(split_num)), logtoplot(self.reward_repeat_log, split_num), "-o",
                 label="repeat_reward")
        plt.plot(list(range(split_num)), logtoplot(self.reward_action_log, split_num), "-o",
                 label="action_reward")
        plt.plot(list(range(split_num)), logtoplot(self.reward_see_log, split_num), "-o",
                 label="see_reward")
        plt.text(0.8, 0.8, "episode:"+str(len(self.reward_log)), size = 15)
        plt.legend()

    def repeat(self, num, out=10):
        # 頭がもげるのを防止するプログラム(まだ自爆しようとするagentがいる)
        if self.turn_repeat[1] != num:
            self.turn_repeat[0] = 0
            self.turn_repeat[1] = num
            return 0
        else:
            self.turn_repeat[0] += 1
            if self.turn_repeat[0] > out:
                return 100
            else:
                return 0

    def step(self, action):
        # actionから移動をして報酬を計算するプログラム
        reward = 0
        reward_repeat = 0
        reward_action = 0
        reward_see = 0
        if random.random() < self.noise:
            end_action = action
            if action == 0:
                reward_repeat -= self.repeat(0, out=20)
                self.send_dash(100)
            elif action == 1:
                self.send_kick(100, 0)
            elif action == 2:
                reward_repeat -= self.repeat(1)
                self.send_turn(-30)
            elif action == 3:
                reward_repeat -= self.repeat(1)
                self.send_turn(30)
            elif action == 4:
                reward_repeat -= self.repeat(1)
                self.send_turn(-10)
            elif action == 5:
                reward_repeat -= self.repeat(1)
                self.send_turn(10)
            elif action == 6:
                reward_repeat -= self.repeat(1, out=2)
                self.send_turn(120)
            elif action == 7:
                self.send_kick(50, 0)
            elif action == 8:
                self.send_kick(25, 0)
        else:
            action = random.randrange(0,len(self.actions))
            end_action = action

        # 見えているものを確認する
        rec = {"type": None}
        rec_raw = []
        while rec["type"] != "see":
            rec_raw = self.recieve_msg()
            rec = self.msg_analysis(rec_raw)
        #     if rec["type"] == "hear":
        #         # ゴールしたときの報酬(できたらめっちゃ褒める、できなかったらめっちゃ怒ってもらう)
        #         if rec["contents"][:6] == "goal_" + self.team:
        #             self.send_move(-10, -10)
        #             reward += 1000
        #         elif rec["contents"][:6] == "goal_" + self.enemy:
        #             self.send_move(-10, -10)
        #             reward -= 1000
        #         # kick_in等のPKになったら怒ってもらう
        #         if rec["contents"][:9] == "kick_in_" + self.enemy or \
        #                 rec["contents"][:11] == "goal_kick_" + self.enemy or \
        #                 rec["contents"][:13] == "corner_kick_" + self.enemy or \
        #                 rec["contents"][:11] == "free_kick_" + self.enemy:
        #             reward -= 200
        ball_info = self.see_analysis(rec_raw, "b")

        # ボールが見えてるか
        if ball_info is None:
            ball_info = [self.max_ball_distance, self.max_ball_angle]
        else:
            ball_info = list(map(lambda n: float(n), ball_info))
            if ball_info[0] > self.max_ball_distance:
                ball_info = [self.max_ball_distance,ball_info[1]]
            elif ball_info[0] < 1:
                reward_see += 25
            # 距離に応じた報酬をもらう
            reward_see += 80 - (ball_info[0] * 2)
        if action == 1 or action == 7 or action == 8:
            if ball_info[0] < 1:
                print("INFO: agent kicked the ball!")
                reward_action += 100
            else:
                reward_action -= 100

        # ログの追加
        reward += reward_repeat + reward_action + reward_see
        self.reward_log.append(reward)
        self.reward_action_log.append(reward_action)
        self.reward_repeat_log.append(reward_repeat)
        self.reward_see_log.append(reward_see)
        return (int(ball_info[0] / self.ball_distance_interval),
                int(ball_info[1] / self.ball_angle_interval)), reward, end_action

    def show_log(self):
        # ログの表示
        # 描画領域
        fig = plt.figure(figsize=(10, 6))
        # 描画するデータ
        x = []
        y = []

        params = {
            'fig': fig,
            'func': self._update,  # グラフを更新する関数
            'fargs': (x, y),  # 関数の引数 (フレーム番号を除く)
            'interval': self.report_interval*10,  # 更新間隔 (ミリ秒)
            'frames': itertools.count(0, 1),  # フレーム番号を無限に生成するイテレータ
        }
        _ = animation.FuncAnimation(**params)

        # グラフを表示する
        plt.show()
いろいろなimport
import random

import libscips.signal
import itertools
import numpy as np
from matplotlib import pyplot as plt, animation

一応使いみちも載せておく。

名前 説明
random このAIはopenAI gymのfrozen lakeみたいに絶対にはagentの思ったとおりには進まないようにしている。(設定により変更可能)
libscips.signal このプログラムはlibscips α0.0.1を使用してサッカーサーバー感の通信をしている。libscipsについてはこちらを参照。
github.com
開発記はこちら。
kumitatepazuru.hatenablog.com
numpy グラフを表示するための平均計算に使用。それだけ。
itertools グラフを表示するために使用。
matplotlib グラフの作成に使用。

ここで気づくのはグラフ作成にmatplotlibを使用しているところ。AIやってるなら、おとなしくtensorbord使えという人もいると思いますが、自分は

tensorbordナニソレオイシイノ?(^_^;)

という人なので、matplotlibを使っています。matplotlibを舐めるな。

logtoplot定義
def logtoplot(data, interval):
    return list(map(lambda n: n.mean(), np.array_split(np.array(data), interval)))

この関数は後述のrewardlogをintervalで分割して平均化するプログラムです。かなり使える。1行にすることにこだわりました。

envクラス定義

引数がやばいを通り越してキモいになっています。

class env(libscips.signal.player_signal):
    def __init__(self, ADDRESS="127.0.0.1", HOST="", send_log=False, recieve_log=False,
                 analysis_log=("unknown", "init", "error","hear"),
                 max_ball_distance=40, min_ball_distance=0, ball_distance_interval=10,
                 max_ball_angle=100, min_ball_angle=-100, ball_angle_interval=40,
                 max_goal_distance=55, min_goal_distance=5, goal_distance_interval=10,
                 max_goal_angle=100, min_goal_angle=-100, goal_angle_interval=40,
                 report_interval=50, noise=0.8):
        # 初期設定
        super().__init__(ADDRESS, HOST, send_log, recieve_log, analysis_log)
        self.actions = ["dash", "kick", "turn1", "turn2", "turn3", "turn4", "turn5", "kick2", "kick3"]
        self.report_interval = report_interval
        self.max_ball_distance = max_ball_distance
        self.min_ball_distance = min_ball_distance
        self.ball_distance_interval = ball_distance_interval
        self.max_ball_angle = max_ball_angle
        self.min_ball_angle = min_ball_angle
        self.ball_angle_interval = ball_angle_interval
        self.max_goal_distance = max_goal_distance
        self.min_goal_distance = min_goal_distance
        self.goal_distance_interval = goal_distance_interval
        self.max_goal_angle = max_goal_angle
        self.min_goal_angle = min_goal_angle
        self.goal_angle_interval = goal_angle_interval
        self.noise = noise
        self.turn_repeat = [0, 0]
        self.team = None
        self.enemy = None
        self.reward_log = [0]
        self.reward_repeat_log = [0]
        self.reward_action_log = [0]
        self.reward_see_log = [0]

w。一応引数の説明。

名前 既定値 説明
ADDRESS "127.0.0.1" サッカーサーバーのIPを指定。
HOST "" 自分のIPを指定。
send_log False サーバーへの送信ログを表示するかを指定。
recieve_log False 受信ログを表示するかを指定。
analysis_log ("unknown", "init", "error","hear") サーバーから受信したログを表示させるかどうか。フィルターがかけられる。標準では
unknown,init,error,hear
が表示される。
max_ball_distance 40 agentが認識できるボールの距離の最大値。
min_ball_distance 0 agentが認識できるボールの距離の最小値。
ball_distance_interval 10 どのくらい分割をして認識させるか。
既定値は10個ずつで3分割されて4つになる。
max_ball_angle 100 agentが認識できるボールの角度の最大値。
min_ball_angle -100 agentが認識できるボールの角度の最小値。
ball_angle_interval 40 どのくらい分割をして認識させるか。
既定値は40個ずつで4分割されて5つになる。
max_goal_distance 55 agentが認識できるゴールの距離の最大値。
min_goal_distance 5 agentが認識できるゴールの距離の最小値。
goal_distance_interval 10 どのくらい分割をして認識させるか。
既定値は10個ずつで4分割されて5つになる。
max_goal_angle 100 agentが認識できるゴールの角度の最大値。
min_goal_angle -100 agentが認識できるゴールの角度の最小値。
goal_angle_interval 40 どのくらい分割をして認識させるか。
既定値は40個ずつで4分割されて5つになる。
report_interval 50 logを平均値化する間隔を指定。
noise 0.8 agentが指定した方向に進める確率を指定。既定値では80%の確率で進めるようになっている。

詳細については割愛。

いろいろな便利にするための定義等
def __len__(self):
        # len(env)をやったときに返すデータ
        return len(self.actions)

def _update(self, _, _2, _3):
        """グラフを更新するための関数"""

        split_num = int(len(self.reward_log) / self.report_interval) + 1

        # 現在のグラフを消去する
        plt.cla()
        # 折れ線グラフを再描画する
        plt.plot(list(range(split_num)), logtoplot(self.reward_log, split_num), "-o", label="reward")
        plt.plot(list(range(split_num)), logtoplot(self.reward_repeat_log, split_num), "-o",
                 label="repeat_reward")
        plt.plot(list(range(split_num)), logtoplot(self.reward_action_log, split_num), "-o",
                 label="action_reward")
        plt.plot(list(range(split_num)), logtoplot(self.reward_see_log, split_num), "-o",
                 label="see_reward")
        plt.text(0.8, 0.8, "episode:"+str(len(self.reward_log)), size = 15)
        plt.legend()

このプログラムによりlen(env)とやったときにlen(self.actions)が返されるようになった。 また、グラフ更新プログラムが定義された。

初期設定の定義
def reset(self, name):
        # 環境のリセット
        self.team = self.send_init(name)[0][1]
        self.enemy = "l" * (self.team == "r") + "r" * (self.team == "l")
        self.send_move(-10, -10)
        rec = {"type": None}
        rec_raw = []
        while rec["type"] != "see":
            rec_raw = self.recieve_msg()
            rec = self.msg_analysis(rec_raw)
        ball_info = self.see_analysis(rec_raw, "b")
        if ball_info is None:
            ball_info = (self.max_ball_distance, self.max_ball_angle)
        return int(float(ball_info[0]) / self.ball_distance_interval), int(float(ball_info[1]) /
                                                                    self.ball_angle_interval), self.turn_repeat[0]

環境のリセット(agentをサーバーに登録)するプログラム。

変数の説明はこんな感じ。 self.team:自分チームの色

self.enemy:相手チームの色

簡単に言うと登録して、自分チームと相手チームの色を把握してサーバーからseeメッセージが来たら、ボールがあるかを確認してあったら、それに対応するQ tableのindexを返すという感じ。

報酬計算関数定義
def repeat(self, num, out=10):
        # 頭がもげるのを防止するプログラム(まだ自爆しようとするagentがいる)
        if self.turn_repeat[1] != num:
            self.turn_repeat[0] = 0
            self.turn_repeat[1] = num
            return 0
        else:
            self.turn_repeat[0] += 1
            if self.turn_repeat[0] > out:
                return 100
            else:
                return 0

    def step(self, action):
        # actionから移動をして報酬を計算するプログラム
        reward = 0
        reward_repeat = 0
        reward_action = 0
        reward_see = 0
        if random.random() < self.noise:
            end_action = action
            if action == 0:
                reward_repeat -= self.repeat(0, out=20)
                self.send_dash(100)
            elif action == 1:
                self.send_kick(100, 0)
            elif action == 2:
                reward_repeat -= self.repeat(1)
                self.send_turn(-30)
            elif action == 3:
                reward_repeat -= self.repeat(1)
                self.send_turn(30)
            elif action == 4:
                reward_repeat -= self.repeat(1)
                self.send_turn(-10)
            elif action == 5:
                reward_repeat -= self.repeat(1)
                self.send_turn(10)
            elif action == 6:
                reward_repeat -= self.repeat(1, out=2)
                self.send_turn(120)
            elif action == 7:
                self.send_kick(50, 0)
            elif action == 8:
                self.send_kick(25, 0)
        else:
            action = random.randrange(0,len(self.actions))
            end_action = action

        # 見えているものを確認する
        rec = {"type": None}
        rec_raw = []
        while rec["type"] != "see":
            rec_raw = self.recieve_msg()
            rec = self.msg_analysis(rec_raw)
        #     if rec["type"] == "hear":
        #         # ゴールしたときの報酬(できたらめっちゃ褒める、できなかったらめっちゃ怒ってもらう)
        #         if rec["contents"][:6] == "goal_" + self.team:
        #             self.send_move(-10, -10)
        #             reward += 1000
        #         elif rec["contents"][:6] == "goal_" + self.enemy:
        #             self.send_move(-10, -10)
        #             reward -= 1000
        #         # kick_in等のPKになったら怒ってもらう
        #         if rec["contents"][:9] == "kick_in_" + self.enemy or \
        #                 rec["contents"][:11] == "goal_kick_" + self.enemy or \
        #                 rec["contents"][:13] == "corner_kick_" + self.enemy or \
        #                 rec["contents"][:11] == "free_kick_" + self.enemy:
        #             reward -= 200
        ball_info = self.see_analysis(rec_raw, "b")

        # ボールが見えてるか
        if ball_info is None:
            ball_info = [self.max_ball_distance, self.max_ball_angle]
        else:
            ball_info = list(map(lambda n: float(n), ball_info))
            if ball_info[0] > self.max_ball_distance:
                ball_info = [self.max_ball_distance,ball_info[1]]
            elif ball_info[0] < 1:
                reward_see += 25
            # 距離に応じた報酬をもらう
            reward_see += 80 - (ball_info[0] * 2)
        if action == 1 or action == 7 or action == 8:
            if ball_info[0] < 1:
                print("INFO: agent kicked the ball!")
                reward_action += 100
            else:
                reward_action -= 100

        # ログの追加
        reward += reward_repeat + reward_action + reward_see
        self.reward_log.append(reward)
        self.reward_action_log.append(reward_action)
        self.reward_repeat_log.append(reward_repeat)
        self.reward_see_log.append(reward_see)
        return (int(ball_info[0] / self.ball_distance_interval),
                int(ball_info[1] / self.ball_angle_interval)), reward, end_action

repeat関数はコメントにもあるとおり頭をブンブン振り回さないようにするプログラム。でも、頭だけだと、走りまくればいいんじゃねと考えるから、走るときにも適用させている。

actionを実際に適用
        # actionから移動をして報酬を計算するプログラム
        if random.random() < self.noise:
            end_action = action
            if action == 0:
                reward_repeat -= self.repeat(0, out=20)
                self.send_dash(100)
            elif action == 1:
                self.send_kick(100, 0)
            elif action == 2:
                reward_repeat -= self.repeat(1)
                self.send_turn(-30)
            elif action == 3:
                reward_repeat -= self.repeat(1)
                self.send_turn(30)
            elif action == 4:
                reward_repeat -= self.repeat(1)
                self.send_turn(-10)
            elif action == 5:
                reward_repeat -= self.repeat(1)
                self.send_turn(10)
            elif action == 6:
                reward_repeat -= self.repeat(1, out=2)
                self.send_turn(120)
            elif action == 7:
                self.send_kick(50, 0)
            elif action == 8:
                self.send_kick(25, 0)
        else:
            action = random.randrange(0,len(self.actions))
            end_action = action

一定確率(self.noise)によってagentが指定したactionを適用する。 なので、たまにランダムに選ばれる。

見えているものを確認
# 見えているものを確認する
        rec = {"type": None}
        rec_raw = []
        while rec["type"] != "see":
            rec_raw = self.recieve_msg()
            rec = self.msg_analysis(rec_raw)
        ball_info = self.see_analysis(rec_raw, "b")

seeメッセージがくるまでまってもし来たらボールがあるか探すプログラム。

報酬を設定
# ボールが見えてるか
        if ball_info is None:
            ball_info = [self.max_ball_distance, self.max_ball_angle]
        else:
            ball_info = list(map(lambda n: float(n), ball_info))
            if ball_info[0] > self.max_ball_distance:
                ball_info = [self.max_ball_distance,ball_info[1]]
            elif ball_info[0] < 1:
                reward_see += 25
            # 距離に応じた報酬をもらう
            reward_see += 80 - (ball_info[0] * 2)
        if action == 1 or action == 7 or action == 8:
            if ball_info[0] < 1:
                print("INFO: agent kicked the ball!")
                reward_action += 100
            else:
                reward_action -= 100

報酬の付け方は書いてあるとおりなので割愛。

報酬の分類
# ログの追加
        reward += reward_repeat + reward_action + reward_see
        self.reward_log.append(reward)
        self.reward_action_log.append(reward_action)
        self.reward_repeat_log.append(reward_repeat)
        self.reward_see_log.append(reward_see)
        return (int(ball_info[0] / self.ball_distance_interval),
                int(ball_info[1] / self.ball_angle_interval)), reward, end_action

このプログラムはrewardは

reward_repeat

reward_action

reward_see

の3種類に分けられているのを一つにまとめている。なぜまとめているかというとグラフ表示で内訳を表示するためにやっている。 最後に、returnでボールの角度と距離と報酬と最終的にサーバー側に送信したactionをmainに返して終わり。

このAIのゴールの瞬間


soccerAIゴールの瞬間(概要欄を見てね)

最後に

このプログラム書くのに2週間くらいかかって この記事書くのに3時間強かかりました。 そのおかげで文字数は約39000文字です。1ページにまとめたらすごく重くなったので2ページ分割しました。 疲れた。


個人的な質問等はこちらまで。

https://forms.gle/V6NRhoTooFw15hJdA

また、自分が参加しているRobocup soccer シミュレーションリーグのチームでは参加者募集中です!活動の見学、活動に参加したい方、ご連絡お待ちしております!

詳しくはこちら

kumitatepazuru.github.io