soccerAIのプログラムレベルのお話(α0.0.2) ②
この記事は以下の記事の続きです。見ていない方はまずこちらを見てください。
解説の続き
env_data.py
env_dataはopenAI gymのプログラムにかなり似せたプログラムになっている。 プログラムソースコードはこちら。(すごい長いです)
import random import libscips.signal import itertools import numpy as np from matplotlib import pyplot as plt, animation def logtoplot(data, interval): return list(map(lambda n: n.mean(), np.array_split(np.array(data), interval))) class env(libscips.signal.player_signal): def __init__(self, ADDRESS="127.0.0.1", HOST="", send_log=False, recieve_log=False, analysis_log=("unknown", "init", "error","hear"), max_ball_distance=40, min_ball_distance=0, ball_distance_interval=10, max_ball_angle=100, min_ball_angle=-100, ball_angle_interval=40, max_goal_distance=55, min_goal_distance=5, goal_distance_interval=10, max_goal_angle=100, min_goal_angle=-100, goal_angle_interval=40, report_interval=50, noise=0.8): # 初期設定 super().__init__(ADDRESS, HOST, send_log, recieve_log, analysis_log) self.actions = ["dash", "kick", "turn1", "turn2", "turn3", "turn4", "turn5", "kick2", "kick3"] self.report_interval = report_interval self.max_ball_distance = max_ball_distance self.min_ball_distance = min_ball_distance self.ball_distance_interval = ball_distance_interval self.max_ball_angle = max_ball_angle self.min_ball_angle = min_ball_angle self.ball_angle_interval = ball_angle_interval self.max_goal_distance = max_goal_distance self.min_goal_distance = min_goal_distance self.goal_distance_interval = goal_distance_interval self.max_goal_angle = max_goal_angle self.min_goal_angle = min_goal_angle self.goal_angle_interval = goal_angle_interval self.noise = noise self.turn_repeat = [0, 0] self.team = None self.enemy = None self.reward_log = [0] self.reward_repeat_log = [0] self.reward_action_log = [0] self.reward_see_log = [0] def __len__(self): # len(env)をやったときに返すデータ return len(self.actions) def _update(self, _, _2, _3): """グラフを更新するための関数""" split_num = int(len(self.reward_log) / self.report_interval) + 1 # 現在のグラフを消去する plt.cla() # 折れ線グラフを再描画する plt.plot(list(range(split_num)), logtoplot(self.reward_log, split_num), "-o", label="reward") plt.plot(list(range(split_num)), logtoplot(self.reward_repeat_log, split_num), "-o", label="repeat_reward") plt.plot(list(range(split_num)), logtoplot(self.reward_action_log, split_num), "-o", label="action_reward") plt.plot(list(range(split_num)), logtoplot(self.reward_see_log, split_num), "-o", label="see_reward") plt.text(0.8, 0.8, "episode:"+str(len(self.reward_log)), size = 15) plt.legend() def repeat(self, num, out=10): # 頭がもげるのを防止するプログラム(まだ自爆しようとするagentがいる) if self.turn_repeat[1] != num: self.turn_repeat[0] = 0 self.turn_repeat[1] = num return 0 else: self.turn_repeat[0] += 1 if self.turn_repeat[0] > out: return 100 else: return 0 def step(self, action): # actionから移動をして報酬を計算するプログラム reward = 0 reward_repeat = 0 reward_action = 0 reward_see = 0 if random.random() < self.noise: end_action = action if action == 0: reward_repeat -= self.repeat(0, out=20) self.send_dash(100) elif action == 1: self.send_kick(100, 0) elif action == 2: reward_repeat -= self.repeat(1) self.send_turn(-30) elif action == 3: reward_repeat -= self.repeat(1) self.send_turn(30) elif action == 4: reward_repeat -= self.repeat(1) self.send_turn(-10) elif action == 5: reward_repeat -= self.repeat(1) self.send_turn(10) elif action == 6: reward_repeat -= self.repeat(1, out=2) self.send_turn(120) elif action == 7: self.send_kick(50, 0) elif action == 8: self.send_kick(25, 0) else: action = random.randrange(0,len(self.actions)) end_action = action # 見えているものを確認する rec = {"type": None} rec_raw = [] while rec["type"] != "see": rec_raw = self.recieve_msg() rec = self.msg_analysis(rec_raw) # if rec["type"] == "hear": # # ゴールしたときの報酬(できたらめっちゃ褒める、できなかったらめっちゃ怒ってもらう) # if rec["contents"][:6] == "goal_" + self.team: # self.send_move(-10, -10) # reward += 1000 # elif rec["contents"][:6] == "goal_" + self.enemy: # self.send_move(-10, -10) # reward -= 1000 # # kick_in等のPKになったら怒ってもらう # if rec["contents"][:9] == "kick_in_" + self.enemy or \ # rec["contents"][:11] == "goal_kick_" + self.enemy or \ # rec["contents"][:13] == "corner_kick_" + self.enemy or \ # rec["contents"][:11] == "free_kick_" + self.enemy: # reward -= 200 ball_info = self.see_analysis(rec_raw, "b") # ボールが見えてるか if ball_info is None: ball_info = [self.max_ball_distance, self.max_ball_angle] else: ball_info = list(map(lambda n: float(n), ball_info)) if ball_info[0] > self.max_ball_distance: ball_info = [self.max_ball_distance,ball_info[1]] elif ball_info[0] < 1: reward_see += 25 # 距離に応じた報酬をもらう reward_see += 80 - (ball_info[0] * 2) if action == 1 or action == 7 or action == 8: if ball_info[0] < 1: print("INFO: agent kicked the ball!") reward_action += 100 else: reward_action -= 100 # ログの追加 reward += reward_repeat + reward_action + reward_see self.reward_log.append(reward) self.reward_action_log.append(reward_action) self.reward_repeat_log.append(reward_repeat) self.reward_see_log.append(reward_see) return (int(ball_info[0] / self.ball_distance_interval), int(ball_info[1] / self.ball_angle_interval)), reward, end_action def show_log(self): # ログの表示 # 描画領域 fig = plt.figure(figsize=(10, 6)) # 描画するデータ x = [] y = [] params = { 'fig': fig, 'func': self._update, # グラフを更新する関数 'fargs': (x, y), # 関数の引数 (フレーム番号を除く) 'interval': self.report_interval*10, # 更新間隔 (ミリ秒) 'frames': itertools.count(0, 1), # フレーム番号を無限に生成するイテレータ } _ = animation.FuncAnimation(**params) # グラフを表示する plt.show()
いろいろなimport
import random import libscips.signal import itertools import numpy as np from matplotlib import pyplot as plt, animation
一応使いみちも載せておく。
名前 | 説明 |
---|---|
random | このAIはopenAI gymのfrozen lakeみたいに絶対にはagentの思ったとおりには進まないようにしている。(設定により変更可能) |
libscips.signal | このプログラムはlibscips α0.0.1を使用してサッカーサーバー感の通信をしている。libscipsについてはこちらを参照。 github.com 開発記はこちら。 kumitatepazuru.hatenablog.com |
numpy | グラフを表示するための平均計算に使用。それだけ。 |
itertools | グラフを表示するために使用。 |
matplotlib | グラフの作成に使用。 |
ここで気づくのはグラフ作成にmatplotlibを使用しているところ。AIやってるなら、おとなしくtensorbord使えという人もいると思いますが、自分は
tensorbordナニソレオイシイノ?(^_^;)
という人なので、matplotlibを使っています。matplotlibを舐めるな。
logtoplot定義
def logtoplot(data, interval): return list(map(lambda n: n.mean(), np.array_split(np.array(data), interval)))
この関数は後述のrewardlogをintervalで分割して平均化するプログラムです。かなり使える。1行にすることにこだわりました。
envクラス定義
引数がやばいを通り越してキモいになっています。
class env(libscips.signal.player_signal): def __init__(self, ADDRESS="127.0.0.1", HOST="", send_log=False, recieve_log=False, analysis_log=("unknown", "init", "error","hear"), max_ball_distance=40, min_ball_distance=0, ball_distance_interval=10, max_ball_angle=100, min_ball_angle=-100, ball_angle_interval=40, max_goal_distance=55, min_goal_distance=5, goal_distance_interval=10, max_goal_angle=100, min_goal_angle=-100, goal_angle_interval=40, report_interval=50, noise=0.8): # 初期設定 super().__init__(ADDRESS, HOST, send_log, recieve_log, analysis_log) self.actions = ["dash", "kick", "turn1", "turn2", "turn3", "turn4", "turn5", "kick2", "kick3"] self.report_interval = report_interval self.max_ball_distance = max_ball_distance self.min_ball_distance = min_ball_distance self.ball_distance_interval = ball_distance_interval self.max_ball_angle = max_ball_angle self.min_ball_angle = min_ball_angle self.ball_angle_interval = ball_angle_interval self.max_goal_distance = max_goal_distance self.min_goal_distance = min_goal_distance self.goal_distance_interval = goal_distance_interval self.max_goal_angle = max_goal_angle self.min_goal_angle = min_goal_angle self.goal_angle_interval = goal_angle_interval self.noise = noise self.turn_repeat = [0, 0] self.team = None self.enemy = None self.reward_log = [0] self.reward_repeat_log = [0] self.reward_action_log = [0] self.reward_see_log = [0]
w。一応引数の説明。
名前 | 既定値 | 説明 |
---|---|---|
ADDRESS | "127.0.0.1" | サッカーサーバーのIPを指定。 |
HOST | "" | 自分のIPを指定。 |
send_log | False | サーバーへの送信ログを表示するかを指定。 |
recieve_log | False | 受信ログを表示するかを指定。 |
analysis_log | ("unknown", "init", "error","hear") | サーバーから受信したログを表示させるかどうか。フィルターがかけられる。標準では unknown,init,error,hear が表示される。 |
max_ball_distance | 40 | agentが認識できるボールの距離の最大値。 |
min_ball_distance | 0 | agentが認識できるボールの距離の最小値。 |
ball_distance_interval | 10 | どのくらい分割をして認識させるか。 既定値は10個ずつで3分割されて4つになる。 |
max_ball_angle | 100 | agentが認識できるボールの角度の最大値。 |
min_ball_angle | -100 | agentが認識できるボールの角度の最小値。 |
ball_angle_interval | 40 | どのくらい分割をして認識させるか。 既定値は40個ずつで4分割されて5つになる。 |
max_goal_distance | 55 | agentが認識できるゴールの距離の最大値。 |
min_goal_distance | 5 | agentが認識できるゴールの距離の最小値。 |
goal_distance_interval | 10 | どのくらい分割をして認識させるか。 既定値は10個ずつで4分割されて5つになる。 |
max_goal_angle | 100 | agentが認識できるゴールの角度の最大値。 |
min_goal_angle | -100 | agentが認識できるゴールの角度の最小値。 |
goal_angle_interval | 40 | どのくらい分割をして認識させるか。 既定値は40個ずつで4分割されて5つになる。 |
report_interval | 50 | logを平均値化する間隔を指定。 |
noise | 0.8 | agentが指定した方向に進める確率を指定。既定値では80%の確率で進めるようになっている。 |
詳細については割愛。
いろいろな便利にするための定義等
def __len__(self): # len(env)をやったときに返すデータ return len(self.actions) def _update(self, _, _2, _3): """グラフを更新するための関数""" split_num = int(len(self.reward_log) / self.report_interval) + 1 # 現在のグラフを消去する plt.cla() # 折れ線グラフを再描画する plt.plot(list(range(split_num)), logtoplot(self.reward_log, split_num), "-o", label="reward") plt.plot(list(range(split_num)), logtoplot(self.reward_repeat_log, split_num), "-o", label="repeat_reward") plt.plot(list(range(split_num)), logtoplot(self.reward_action_log, split_num), "-o", label="action_reward") plt.plot(list(range(split_num)), logtoplot(self.reward_see_log, split_num), "-o", label="see_reward") plt.text(0.8, 0.8, "episode:"+str(len(self.reward_log)), size = 15) plt.legend()
このプログラムによりlen(env)
とやったときにlen(self.actions)
が返されるようになった。
また、グラフ更新プログラムが定義された。
初期設定の定義
def reset(self, name): # 環境のリセット self.team = self.send_init(name)[0][1] self.enemy = "l" * (self.team == "r") + "r" * (self.team == "l") self.send_move(-10, -10) rec = {"type": None} rec_raw = [] while rec["type"] != "see": rec_raw = self.recieve_msg() rec = self.msg_analysis(rec_raw) ball_info = self.see_analysis(rec_raw, "b") if ball_info is None: ball_info = (self.max_ball_distance, self.max_ball_angle) return int(float(ball_info[0]) / self.ball_distance_interval), int(float(ball_info[1]) / self.ball_angle_interval), self.turn_repeat[0]
環境のリセット(agentをサーバーに登録)するプログラム。
変数の説明はこんな感じ。 self.team:自分チームの色
self.enemy:相手チームの色
簡単に言うと登録して、自分チームと相手チームの色を把握してサーバーからseeメッセージが来たら、ボールがあるかを確認してあったら、それに対応するQ tableのindexを返すという感じ。
報酬計算関数定義
def repeat(self, num, out=10): # 頭がもげるのを防止するプログラム(まだ自爆しようとするagentがいる) if self.turn_repeat[1] != num: self.turn_repeat[0] = 0 self.turn_repeat[1] = num return 0 else: self.turn_repeat[0] += 1 if self.turn_repeat[0] > out: return 100 else: return 0 def step(self, action): # actionから移動をして報酬を計算するプログラム reward = 0 reward_repeat = 0 reward_action = 0 reward_see = 0 if random.random() < self.noise: end_action = action if action == 0: reward_repeat -= self.repeat(0, out=20) self.send_dash(100) elif action == 1: self.send_kick(100, 0) elif action == 2: reward_repeat -= self.repeat(1) self.send_turn(-30) elif action == 3: reward_repeat -= self.repeat(1) self.send_turn(30) elif action == 4: reward_repeat -= self.repeat(1) self.send_turn(-10) elif action == 5: reward_repeat -= self.repeat(1) self.send_turn(10) elif action == 6: reward_repeat -= self.repeat(1, out=2) self.send_turn(120) elif action == 7: self.send_kick(50, 0) elif action == 8: self.send_kick(25, 0) else: action = random.randrange(0,len(self.actions)) end_action = action # 見えているものを確認する rec = {"type": None} rec_raw = [] while rec["type"] != "see": rec_raw = self.recieve_msg() rec = self.msg_analysis(rec_raw) # if rec["type"] == "hear": # # ゴールしたときの報酬(できたらめっちゃ褒める、できなかったらめっちゃ怒ってもらう) # if rec["contents"][:6] == "goal_" + self.team: # self.send_move(-10, -10) # reward += 1000 # elif rec["contents"][:6] == "goal_" + self.enemy: # self.send_move(-10, -10) # reward -= 1000 # # kick_in等のPKになったら怒ってもらう # if rec["contents"][:9] == "kick_in_" + self.enemy or \ # rec["contents"][:11] == "goal_kick_" + self.enemy or \ # rec["contents"][:13] == "corner_kick_" + self.enemy or \ # rec["contents"][:11] == "free_kick_" + self.enemy: # reward -= 200 ball_info = self.see_analysis(rec_raw, "b") # ボールが見えてるか if ball_info is None: ball_info = [self.max_ball_distance, self.max_ball_angle] else: ball_info = list(map(lambda n: float(n), ball_info)) if ball_info[0] > self.max_ball_distance: ball_info = [self.max_ball_distance,ball_info[1]] elif ball_info[0] < 1: reward_see += 25 # 距離に応じた報酬をもらう reward_see += 80 - (ball_info[0] * 2) if action == 1 or action == 7 or action == 8: if ball_info[0] < 1: print("INFO: agent kicked the ball!") reward_action += 100 else: reward_action -= 100 # ログの追加 reward += reward_repeat + reward_action + reward_see self.reward_log.append(reward) self.reward_action_log.append(reward_action) self.reward_repeat_log.append(reward_repeat) self.reward_see_log.append(reward_see) return (int(ball_info[0] / self.ball_distance_interval), int(ball_info[1] / self.ball_angle_interval)), reward, end_action
repeat関数はコメントにもあるとおり頭をブンブン振り回さないようにするプログラム。でも、頭だけだと、走りまくればいいんじゃねと考えるから、走るときにも適用させている。
actionを実際に適用
# actionから移動をして報酬を計算するプログラム if random.random() < self.noise: end_action = action if action == 0: reward_repeat -= self.repeat(0, out=20) self.send_dash(100) elif action == 1: self.send_kick(100, 0) elif action == 2: reward_repeat -= self.repeat(1) self.send_turn(-30) elif action == 3: reward_repeat -= self.repeat(1) self.send_turn(30) elif action == 4: reward_repeat -= self.repeat(1) self.send_turn(-10) elif action == 5: reward_repeat -= self.repeat(1) self.send_turn(10) elif action == 6: reward_repeat -= self.repeat(1, out=2) self.send_turn(120) elif action == 7: self.send_kick(50, 0) elif action == 8: self.send_kick(25, 0) else: action = random.randrange(0,len(self.actions)) end_action = action
一定確率(self.noise)によってagentが指定したactionを適用する。 なので、たまにランダムに選ばれる。
見えているものを確認
# 見えているものを確認する rec = {"type": None} rec_raw = [] while rec["type"] != "see": rec_raw = self.recieve_msg() rec = self.msg_analysis(rec_raw) ball_info = self.see_analysis(rec_raw, "b")
seeメッセージがくるまでまってもし来たらボールがあるか探すプログラム。
報酬を設定
# ボールが見えてるか if ball_info is None: ball_info = [self.max_ball_distance, self.max_ball_angle] else: ball_info = list(map(lambda n: float(n), ball_info)) if ball_info[0] > self.max_ball_distance: ball_info = [self.max_ball_distance,ball_info[1]] elif ball_info[0] < 1: reward_see += 25 # 距離に応じた報酬をもらう reward_see += 80 - (ball_info[0] * 2) if action == 1 or action == 7 or action == 8: if ball_info[0] < 1: print("INFO: agent kicked the ball!") reward_action += 100 else: reward_action -= 100
報酬の付け方は書いてあるとおりなので割愛。
報酬の分類
# ログの追加 reward += reward_repeat + reward_action + reward_see self.reward_log.append(reward) self.reward_action_log.append(reward_action) self.reward_repeat_log.append(reward_repeat) self.reward_see_log.append(reward_see) return (int(ball_info[0] / self.ball_distance_interval), int(ball_info[1] / self.ball_angle_interval)), reward, end_action
このプログラムはrewardは
reward_repeat
reward_action
reward_see
の3種類に分けられているのを一つにまとめている。なぜまとめているかというとグラフ表示で内訳を表示するためにやっている。 最後に、returnでボールの角度と距離と報酬と最終的にサーバー側に送信したactionをmainに返して終わり。
このAIのゴールの瞬間
最後に
このプログラム書くのに2週間くらいかかって
この記事書くのに3時間強かかりました。
そのおかげで文字数は約39000文字です。1ページにまとめたらすごく重くなったので2ページ分割しました。
疲れた。
個人的な質問等はこちらまで。
https://forms.gle/V6NRhoTooFw15hJdA
また、自分が参加しているRobocup soccer シミュレーションリーグのチームでは参加者募集中です!活動の見学、活動に参加したい方、ご連絡お待ちしております!
詳しくはこちら