diff --git a/experiments/league_alphastar.yaml b/experiments/league_alphastar.yaml new file mode 100644 index 00000000..47862c6f --- /dev/null +++ b/experiments/league_alphastar.yaml @@ -0,0 +1,87 @@ +league: + folder: "league_v2/" + max_matches: 10_000 +population: + initial_agents: + - "gym-microrts-static-files/agent_sota.pt" + structure: + - group: "main_player" + num_agents: 2 + learner: true + init_archive: true + - group: "main_exploiter" + num_agents: 2 + learner: true + init_archive: true + - group: "league_exploiter" + num_agents: 5 + learner: true + init_archive: false +archive: + algorithm: "alphastar" + args: + min_steps: 1_000_000 + max_steps: 7_500_000 + winrate_threshold: 0.7 +matchmaking: + bootstrap: "random" + bootstrap_args: + num_opponents: 2 + algorithm: "alphastar" +train: + num_workers: 2 + entrypoint: ppo_gridnet_selfplay:train + args: + prod_mode: false + num_models: 100 + num_selfplay_envs: 2 + num_bot_envs: 0 + partial_obs: false + n_minibatch: 4 + kle_stop: false + kle_rollback: false + exp_name: ppo_gridnet_selfplay + gym_id: MicroRTSGridModeVecEnv + learning_rate: 2.5e-4 + seed: 42 + total_timesteps: 5_000 + torch_deterministic: true + cuda: true + track: false + capture_video: false + num_steps: 256 + anneal_lr: true + gae: true + gamma: 0.99 + gae_lambda: 0.95 + num_minibatches: 4 + update_epochs: 4 + norm_adv: true + clip_coef: 0.2 + clip_vloss: true + ent_coef: 0.01 + vf_coef: 0.5 + max_grad_norm: 0.5 + target_kl: null +evaluate: + num_workers: 2 + reference_agents_folder: "league_v2/eval" + matches_per_opponent: 3 + entrypoint: ppo_gridnet_selfplay:evaluate + mmr: + algorithm: "trueskill" + args: + gym_id: MicroRTSGridModeVecEnv + exp_name: ppo_gridnet_selfplay + seed: 42 + torch_deterministic: true + capture_video: false + cuda: false + partial_obs: false + max_steps: 2_000 + total_timesteps: 5_000 + num_selfplay_envs: 2 + num_bot_envs: 0 + num_steps: 256 + n_minibatch: 4 + num_models: 100 \ No newline at end of file diff --git a/experiments/league_openfive.yaml b/experiments/league_openfive.yaml new file mode 100644 index 00000000..588a10b8 --- /dev/null +++ b/experiments/league_openfive.yaml @@ -0,0 +1,77 @@ +league: + folder: "league_v2/" + max_matches: 10_000 +population: + initial_agents: + - "gym-microrts-static-files/agent_sota.pt" + structure: + - group: "main_player" + num_agents: 1 + learner: true + init_archive: true +archive: + algorithm: "openfive" + args: + num_steps: 1_000_000 +matchmaking: + bootstrap: "random" + bootstrap_args: + num_opponents: 2 + algorithm: "openfive" +train: + num_workers: 2 + entrypoint: ppo_gridnet_selfplay:train + args: + prod_mode: false + num_models: 100 + num_selfplay_envs: 2 + num_bot_envs: 0 + partial_obs: false + n_minibatch: 4 + kle_stop: false + kle_rollback: false + exp_name: ppo_gridnet_selfplay + gym_id: MicroRTSGridModeVecEnv + learning_rate: 2.5e-4 + seed: 42 + total_timesteps: 5_000 + torch_deterministic: true + cuda: true + track: false + capture_video: false + num_steps: 256 + anneal_lr: true + gae: true + gamma: 0.99 + gae_lambda: 0.95 + num_minibatches: 4 + update_epochs: 4 + norm_adv: true + clip_coef: 0.2 + clip_vloss: true + ent_coef: 0.01 + vf_coef: 0.5 + max_grad_norm: 0.5 + target_kl: null +evaluate: + num_workers: 2 + reference_agents_folder: "league_v2/eval" + matches_per_opponent: 3 + entrypoint: ppo_gridnet_selfplay:evaluate + mmr: + algorithm: "trueskill" + args: + gym_id: MicroRTSGridModeVecEnv + exp_name: ppo_gridnet_selfplay + seed: 42 + torch_deterministic: true + capture_video: false + cuda: false + partial_obs: false + max_steps: 2_000 + total_timesteps: 5_000 + num_selfplay_envs: 2 + num_bot_envs: 0 + num_steps: 256 + n_minibatch: 4 + num_models: 100 \ No newline at end of file diff --git a/experiments/ppo_gridnet_selfplay.py b/experiments/ppo_gridnet_selfplay.py new file mode 100644 index 00000000..8ef68fff --- /dev/null +++ b/experiments/ppo_gridnet_selfplay.py @@ -0,0 +1,683 @@ +# http://proceedings.mlr.press/v97/han19a/han19a.pdf + +import argparse +import os +import random +import subprocess +import time +from concurrent.futures import ThreadPoolExecutor +from distutils.util import strtobool + +import numpy as np +import pandas as pd +import torch +import torch.nn as nn +import torch.optim as optim +from gym.spaces import MultiDiscrete +from stable_baselines3.common.vec_env import VecEnvWrapper, VecMonitor, VecVideoRecorder +from torch.distributions.categorical import Categorical +from torch.utils.tensorboard import SummaryWriter + +from gym_microrts import microrts_ai +from gym_microrts.envs.vec_env import MicroRTSGridModeVecEnv + + +def parse_args(): + # fmt: off + parser = argparse.ArgumentParser() + parser.add_argument('--exp-name', type=str, default=os.path.basename(__file__).rstrip(".py"), + help='the name of this experiment') + parser.add_argument('--gym-id', type=str, default="MicroRTSGridModeVecEnv", + help='the id of the gym environment') + parser.add_argument('--learning-rate', type=float, default=2.5e-4, + help='the learning rate of the optimizer') + parser.add_argument('--seed', type=int, default=1, + help='seed of the experiment') + parser.add_argument('--total-timesteps', type=int, default=100000000, + help='total timesteps of the experiments') + parser.add_argument('--torch-deterministic', type=lambda x: bool(strtobool(x)), default=True, nargs='?', const=True, + help='if toggled, `torch.backends.cudnn.deterministic=False`') + parser.add_argument('--cuda', type=lambda x: bool(strtobool(x)), default=True, nargs='?', const=True, + help='if toggled, cuda will not be enabled by default') + parser.add_argument('--prod-mode', type=lambda x: bool(strtobool(x)), default=False, nargs='?', const=True, + help='run the script in production mode and use wandb to log outputs') + parser.add_argument('--capture-video', type=lambda x: bool(strtobool(x)), default=False, nargs='?', const=True, + help='weather to capture videos of the agent performances (check out `videos` folder)') + parser.add_argument('--wandb-project-name', type=str, default="gym-microrts", + help="the wandb's project name") + parser.add_argument('--wandb-entity', type=str, default=None, + help="the entity (team) of wandb's project") + + # Algorithm specific arguments + parser.add_argument('--partial-obs', type=lambda x: bool(strtobool(x)), default=False, nargs='?', const=True, + help='if toggled, the game will have partial observability') + parser.add_argument('--n-minibatch', type=int, default=4, + help='the number of mini batch') + parser.add_argument('--num-bot-envs', type=int, default=0, + help='the number of bot game environment; 16 bot envs measn 16 games') + parser.add_argument('--num-selfplay-envs', type=int, default=24, + help='the number of self play envs; 16 self play envs means 8 games') + parser.add_argument('--num-steps', type=int, default=256, + help='the number of steps per game environment') + parser.add_argument('--gamma', type=float, default=0.99, + help='the discount factor gamma') + parser.add_argument('--gae-lambda', type=float, default=0.95, + help='the lambda for the general advantage estimation') + parser.add_argument('--ent-coef', type=float, default=0.01, + help="coefficient of the entropy") + parser.add_argument('--vf-coef', type=float, default=0.5, + help="coefficient of the value function") + parser.add_argument('--max-grad-norm', type=float, default=0.5, + help='the maximum norm for the gradient clipping') + parser.add_argument('--clip-coef', type=float, default=0.1, + help="the surrogate clipping coefficient") + parser.add_argument('--update-epochs', type=int, default=4, + help="the K epochs to update the policy") + parser.add_argument('--kle-stop', type=lambda x: bool(strtobool(x)), default=False, nargs='?', const=True, + help='If toggled, the policy updates will be early stopped w.r.t target-kl') + parser.add_argument('--kle-rollback', type=lambda x: bool(strtobool(x)), default=False, nargs='?', const=True, + help='If toggled, the policy updates will roll back to previous policy if KL exceeds target-kl') + parser.add_argument('--target-kl', type=float, default=0.03, + help='the target-kl variable that is referred by --kl') + parser.add_argument('--gae', type=lambda x: bool(strtobool(x)), default=True, nargs='?', const=True, + help='Use GAE for advantage computation') + parser.add_argument('--norm-adv', type=lambda x: bool(strtobool(x)), default=True, nargs='?', const=True, + help="Toggles advantages normalization") + parser.add_argument('--anneal-lr', type=lambda x: bool(strtobool(x)), default=True, nargs='?', const=True, + help="Toggle learning rate annealing for policy and value networks") + parser.add_argument('--clip-vloss', type=lambda x: bool(strtobool(x)), default=True, nargs='?', const=True, + help='Toggles whether or not to use a clipped loss for the value function, as per the paper.') + parser.add_argument('--num-models', type=int, default=200, + help='the number of models saved') + parser.add_argument('--max-eval-workers', type=int, default=2, + help='the maximum number of eval workers (skips evaluation when set to 0)') + + args = parser.parse_args() + + # fmt: on + return wrap_args(args) + + +def wrap_args(args): + if hasattr(args, 'is_wrapped'): + return args + + if not args.seed: + args.seed = int(time.time()) + args.num_envs = args.num_selfplay_envs + args.num_bot_envs + args.batch_size = int(args.num_envs * args.num_steps) + args.minibatch_size = int(args.batch_size // args.n_minibatch) + args.num_updates = args.total_timesteps // args.batch_size + args.save_frequency = max(1, int(args.num_updates // args.num_models)) + + # learning from only agent's experience (i.e., excluding agent2's experience) + args.batch_size = int(args.batch_size // 2) + args.minibatch_size = int(args.minibatch_size // 2) + + args.is_wrapped = True + return args + + +class MicroRTSStatsRecorder(VecEnvWrapper): + def __init__(self, env, gamma=0.99) -> None: + super().__init__(env) + self.gamma = gamma + + def reset(self): + obs = self.venv.reset() + self.raw_rewards = [[] for _ in range(self.num_envs)] + self.ts = np.zeros(self.num_envs, dtype=np.float32) + self.raw_discount_rewards = [[] for _ in range(self.num_envs)] + return obs + + def step_wait(self): + obs, rews, dones, infos = self.venv.step_wait() + newinfos = list(infos[:]) + for i in range(len(dones)): + self.raw_rewards[i] += [infos[i]["raw_rewards"]] + self.raw_discount_rewards[i] += [ + (self.gamma ** self.ts[i]) + * np.concatenate((infos[i]["raw_rewards"], infos[i]["raw_rewards"].sum()), axis=None) + ] + self.ts[i] += 1 + if dones[i]: + info = infos[i].copy() + raw_returns = np.array(self.raw_rewards[i]).sum(0) + raw_names = [str(rf) for rf in self.rfs] + raw_discount_returns = np.array(self.raw_discount_rewards[i]).sum(0) + raw_discount_names = ["discounted_" + str(rf) for rf in self.rfs] + ["discounted"] + info["microrts_stats"] = dict(zip(raw_names, raw_returns)) + info["microrts_stats"].update(dict(zip(raw_discount_names, raw_discount_returns))) + self.raw_rewards[i] = [] + self.raw_discount_rewards[i] = [] + self.ts[i] = 0 + newinfos[i] = info + return obs, rews, dones, newinfos + + +# ALGO LOGIC: initialize agent here: +class CategoricalMasked(Categorical): + def __init__(self, probs=None, logits=None, validate_args=None, masks=[], mask_value=None): + logits = torch.where(masks.bool(), logits, mask_value) + super(CategoricalMasked, self).__init__(probs, logits, validate_args) + + +class Transpose(nn.Module): + def __init__(self, permutation): + super().__init__() + self.permutation = permutation + + def forward(self, x): + return x.permute(self.permutation) + + +def layer_init(layer, std=np.sqrt(2), bias_const=0.0): + torch.nn.init.orthogonal_(layer.weight, std) + torch.nn.init.constant_(layer.bias, bias_const) + return layer + + +class Agent(nn.Module): + def __init__(self, envs, mapsize=16 * 16): + super(Agent, self).__init__() + self.mapsize = mapsize + h, w, c = envs.observation_space.shape + self.encoder = nn.Sequential( + Transpose((0, 3, 1, 2)), + layer_init(nn.Conv2d(c, 32, kernel_size=3, padding=1)), + nn.MaxPool2d(3, stride=2, padding=1), + nn.ReLU(), + layer_init(nn.Conv2d(32, 64, kernel_size=3, padding=1)), + nn.MaxPool2d(3, stride=2, padding=1), + nn.ReLU(), + layer_init(nn.Conv2d(64, 128, kernel_size=3, padding=1)), + nn.MaxPool2d(3, stride=2, padding=1), + nn.ReLU(), + layer_init(nn.Conv2d(128, 256, kernel_size=3, padding=1)), + nn.MaxPool2d(3, stride=2, padding=1), + ) + + self.actor = nn.Sequential( + layer_init(nn.ConvTranspose2d(256, 128, 3, stride=2, padding=1, output_padding=1)), + nn.ReLU(), + layer_init(nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1)), + nn.ReLU(), + layer_init(nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1)), + nn.ReLU(), + layer_init(nn.ConvTranspose2d(32, 78, 3, stride=2, padding=1, output_padding=1)), + Transpose((0, 2, 3, 1)), + ) + self.critic = nn.Sequential( + nn.Flatten(), + layer_init(nn.Linear(256, 128)), + nn.ReLU(), + layer_init(nn.Linear(128, 1), std=1), + ) + self.register_buffer("mask_value", torch.tensor(-1e8)) + + def get_action_and_value(self, x, action=None, invalid_action_masks=None, envs=None, device=None): + hidden = self.encoder(x) + logits = self.actor(hidden) + grid_logits = logits.reshape(-1, envs.action_plane_space.nvec.sum()) + split_logits = torch.split(grid_logits, envs.action_plane_space.nvec.tolist(), dim=1) + + if action is None: + # invalid_action_masks = torch.tensor(np.array(envs.vec_client.getMasks(0))).to(device) + invalid_action_masks = invalid_action_masks.view(-1, invalid_action_masks.shape[-1]) + split_invalid_action_masks = torch.split(invalid_action_masks, envs.action_plane_space.nvec.tolist(), dim=1) + multi_categoricals = [ + CategoricalMasked(logits=logits, masks=iam, mask_value=self.mask_value) + for (logits, iam) in zip(split_logits, split_invalid_action_masks) + ] + action = torch.stack([categorical.sample() for categorical in multi_categoricals]) + else: + invalid_action_masks = invalid_action_masks.view(-1, invalid_action_masks.shape[-1]) + action = action.view(-1, action.shape[-1]).T + split_invalid_action_masks = torch.split(invalid_action_masks, envs.action_plane_space.nvec.tolist(), dim=1) + multi_categoricals = [ + CategoricalMasked(logits=logits, masks=iam, mask_value=self.mask_value) + for (logits, iam) in zip(split_logits, split_invalid_action_masks) + ] + logprob = torch.stack([categorical.log_prob(a) for a, categorical in zip(action, multi_categoricals)]) + entropy = torch.stack([categorical.entropy() for categorical in multi_categoricals]) + num_predicted_parameters = len(envs.action_plane_space.nvec) + logprob = logprob.T.view(-1, self.mapsize, num_predicted_parameters) + entropy = entropy.T.view(-1, self.mapsize, num_predicted_parameters) + action = action.T.view(-1, self.mapsize, num_predicted_parameters) + return action, logprob.sum(1).sum(1), entropy.sum(1).sum(1), invalid_action_masks, self.critic(hidden) + + def get_value(self, x): + return self.critic(self.encoder(x)) + + +def run_evaluation(model_path: str, output_path: str): + args = [ + "python", + "league.py", + "--evals", + model_path, + "--update-db", + "false", + "--cuda", + "false", + "--output-path", + output_path, + ] + fd = subprocess.Popen(args) + print(f"Evaluating {model_path}") + return_code = fd.wait() + assert return_code == 0 + return (model_path, output_path) + + +# xxx(okachaiev): add support for PvE evaluation +def evaluate(args, agent_path, opponent_path): + # xxx(okachaiev): i need to find a better way of dealing with the API like this + args = wrap_args(args) + wins, losses, draws = 0, 0, 0 + device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu") + + # xxx(okachaiev): would be nice if we can setup this with initializer and clean up later + envs = MicroRTSGridModeVecEnv( + num_bot_envs=0, + num_selfplay_envs=2, + partial_obs=args.partial_obs, + max_steps=args.max_steps, + render_theme=2, + map_paths=["maps/16x16/basesWorkers16x16A.xml"], + reward_weight=np.array([10.0, 1.0, 1.0, 0.2, 1.0, 4.0]), + ) + + agent = Agent(envs).to(device) + agent.load_state_dict(torch.load(agent_path, map_location=device)) + agent.eval() + agent2 = Agent(envs).to(device) + agent2.load_state_dict(torch.load(opponent_path, map_location=device)) + agent2.eval() + + next_obs = torch.Tensor(envs.reset()).to(device) + for _ in range(args.total_timesteps): + # self.envs.render() + # ALGO LOGIC: put action logic here + with torch.no_grad(): + mask = torch.tensor(np.array(envs.get_action_mask())).to(device) + + p1_obs = next_obs[::2] + p2_obs = next_obs[1::2] + p1_mask = mask[::2] + p2_mask = mask[1::2] + + p1_action, _, _, _, _ = agent.get_action_and_value( + p1_obs, envs=envs, invalid_action_masks=p1_mask, device=device + ) + p2_action, _, _, _, _ = agent2.get_action_and_value( + p2_obs, envs=envs, invalid_action_masks=p2_mask, device=device + ) + action = torch.zeros((envs.num_envs, p2_action.shape[1], p2_action.shape[2])) + action[::2] = p1_action + action[1::2] = p2_action + + try: + next_obs, rs, ds, infos = envs.step(action.cpu().numpy().reshape(envs.num_envs, -1)) + next_obs = torch.Tensor(next_obs).to(device) + except Exception as e: + e.printStackTrace() + raise + + if ds[0]: + p1_win_loss = infos[0]['raw_rewards'][0] + if p1_win_loss == 1.: + wins += 1 + elif p1_win_loss == -1.: + losses += 1 + else: + draws += 1 + + return wins, losses, draws + + +class TrueskillWriter: + def __init__(self, prod_mode, writer, league_path: str, league_step_path: str): + self.prod_mode = prod_mode + self.writer = writer + self.trueskill_df = pd.read_csv(league_path) + self.trueskill_step_df = pd.read_csv(league_step_path) + self.trueskill_step_df["type"] = self.trueskill_step_df["name"] + self.trueskill_step_df["step"] = 0 + # xxx(okachaiev): not sure we need this copy + self.preset_trueskill_step_df = self.trueskill_step_df.copy() + + def on_evaluation_done(self, future): + if future.cancelled(): + return + model_path, output_path = future.result() + league = pd.read_csv(output_path, index_col="name") + assert model_path in league.index + model_global_step = int(model_path.split("/")[-1][:-3]) + self.writer.add_scalar("charts/trueskill", league.loc[model_path]["trueskill"], model_global_step) + print(f"global_step={model_global_step}, trueskill={league.loc[model_path]['trueskill']}") + + # table visualization logic + if self.prod_mode: + trueskill_data = { + "name": league.loc[model_path].name, + "mu": league.loc[model_path]["mu"], + "sigma": league.loc[model_path]["sigma"], + "trueskill": league.loc[model_path]["trueskill"], + } + self.trueskill_df = self.trueskill_df.append(trueskill_data, ignore_index=True) + wandb.log({"trueskill": wandb.Table(dataframe=self.trueskill_df)}) + trueskill_data["type"] = "training" + trueskill_data["step"] = model_global_step + self.trueskill_step_df = self.trueskill_step_df.append(trueskill_data, ignore_index=True) + preset_trueskill_step_df_clone = self.preset_trueskill_step_df.copy() + preset_trueskill_step_df_clone["step"] = model_global_step + self.trueskill_step_df = self.trueskill_step_df.append(preset_trueskill_step_df_clone, ignore_index=True) + wandb.log({"trueskill_step": wandb.Table(dataframe=self.trueskill_step_df)}) + + +# xxx(okachaiev): use second agent if provided +def train(args, agent_path=None, opponent_path=None): + args = wrap_args(args) + + # TRY NOT TO MODIFY: setup the environment + experiment_name = f"{args.gym_id}__{args.exp_name}__{args.seed}__{int(time.time())}" + if args.prod_mode: + import wandb + + run = wandb.init( + project=args.wandb_project_name, + entity=args.wandb_entity, + # sync_tensorboard=True, + config=vars(args), + name=experiment_name, + monitor_gym=True, + save_code=True, + ) + wandb.tensorboard.patch(save=False) + writer = SummaryWriter(f"runs/{experiment_name}") + writer.add_text( + "hyperparameters", "|param|value|\n|-|-|\n%s" % ("\n".join([f"|{key}|{value}|" for key, value in vars(args).items()])) + ) + + # TRY NOT TO MODIFY: seeding + device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu") + random.seed(args.seed) + np.random.seed(args.seed) + torch.manual_seed(args.seed) + torch.backends.cudnn.deterministic = args.torch_deterministic + ai_bots = [] + if args.num_bot_envs > 0: + ai_bots = [microrts_ai.coacAI for _ in range(args.num_bot_envs - 6)] \ + + [microrts_ai.randomBiasedAI for _ in range(min(args.num_bot_envs, 2))] \ + + [microrts_ai.lightRushAI for _ in range(min(args.num_bot_envs, 2))] \ + + [microrts_ai.workerRushAI for _ in range(min(args.num_bot_envs, 2))] + + envs = MicroRTSGridModeVecEnv( + # xxx(okachaiev): i'm still not sure if it's possible to execute both selfplay and + # PvE within the same env setup. seems like the approach with SlimeVolley was quite + # decent + # num_selfplay_envs=args.num_selfplay_envs, + # num_bot_envs=args.num_bot_envs, + num_selfplay_envs=2, + num_bot_envs=0, + partial_obs=args.partial_obs, + max_steps=2000, + render_theme=2, + ai2s=ai_bots, + map_paths=["maps/16x16/basesWorkers16x16.xml"], + reward_weight=np.array([10.0, 1.0, 1.0, 0.2, 1.0, 4.0]), + ) + envs = MicroRTSStatsRecorder(envs, args.gamma) + envs = VecMonitor(envs) + if args.capture_video: + envs = VecVideoRecorder( + envs, f"videos/{experiment_name}", record_video_trigger=lambda x: x % 100000 == 0, video_length=2000 + ) + assert isinstance(envs.action_space, MultiDiscrete), "only MultiDiscrete action space is supported" + + agent = Agent(envs).to(device) + if agent_path is not None: + agent.load_state_dict(torch.load(agent_path, map_location=device)) + + if opponent_path is not None: + agent2 = Agent(envs).to(device) + agent2.load_state_dict(torch.load(opponent_path, map_location=device)) + agent2.eval() + + wins, losses, draws = 0, 0, 0 + + optimizer = optim.Adam(agent.parameters(), lr=args.learning_rate, eps=1e-5) + if args.anneal_lr: + # https://github.com/openai/baselines/blob/ea25b9e8b234e6ee1bca43083f8f3cf974143998/baselines/ppo2/defaults.py#L20 + lr = lambda f: f * args.learning_rate + + # ALGO Logic: Storage for epoch data + mapsize = 16 * 16 + action_space_shape = (mapsize, len(envs.action_plane_space.nvec)) + invalid_action_shape = (mapsize, envs.action_plane_space.nvec.sum()) + + obs = torch.zeros((args.num_steps, args.num_envs) + envs.observation_space.shape).to(device) + actions = torch.zeros((args.num_steps, args.num_envs) + action_space_shape).to(device) + logprobs = torch.zeros((args.num_steps, args.num_envs)).to(device) + rewards = torch.zeros((args.num_steps, args.num_envs)).to(device) + dones = torch.zeros((args.num_steps, args.num_envs)).to(device) + values = torch.zeros((args.num_steps, args.num_envs)).to(device) + invalid_action_masks = torch.zeros((args.num_steps, args.num_envs) + invalid_action_shape).to(device) + # TRY NOT TO MODIFY: start the game + global_step = 0 + start_time = time.time() + # Note how `next_obs` and `next_done` are used; their usage is equivalent to + # https://github.com/ikostrikov/pytorch-a2c-ppo-acktr-gail/blob/84a7582477fb0d5c82ad6d850fe476829dddd2e1/a2c_ppo_acktr/storage.py#L60 + next_obs = torch.Tensor(envs.reset()).to(device) + next_done = torch.zeros(args.num_envs).to(device) + + ## CRASH AND RESUME LOGIC: + starting_update = 1 + + if args.prod_mode and wandb.run.resumed: + starting_update = run.summary.get("charts/update") + 1 + global_step = starting_update * args.batch_size + api = wandb.Api() + run = api.run(f"{run.entity}/{run.project}/{run.id}") + model = run.file("agent.pt") + model.download(f"models/{experiment_name}/") + agent.load_state_dict(torch.load(f"models/{experiment_name}/agent.pt", map_location=device)) + agent.eval() + print(f"resumed at update {starting_update}") + + print("Model's state_dict:") + for param_tensor in agent.state_dict(): + print(param_tensor, "\t", agent.state_dict()[param_tensor].size()) + total_params = sum([param.nelement() for param in agent.parameters()]) + print("Model's total parameters:", total_params) + + ## EVALUATION LOGIC: + trueskill_writer = TrueskillWriter( + args.prod_mode, writer, "gym-microrts-static-files/league.csv", "gym-microrts-static-files/league.csv" + ) + + for update in range(starting_update, args.num_updates + 1): + # Annealing the rate if instructed to do so. + if args.anneal_lr: + frac = 1.0 - (update - 1.0) / args.num_updates + lrnow = lr(frac) + optimizer.param_groups[0]["lr"] = lrnow + + # TRY NOT TO MODIFY: prepare the execution of the game. + for step in range(0, args.num_steps): + # envs.render() + global_step += 1 * args.num_envs + obs[step] = next_obs + dones[step] = next_done + # ALGO LOGIC: put action logic here + with torch.no_grad(): + invalid_action_masks[step] = torch.tensor(np.array(envs.get_action_mask())).to(device) + + p1_obs = next_obs[::2] + p2_obs = next_obs[1::2] + p1_mask = invalid_action_masks[step][::2] + p2_mask = invalid_action_masks[step][1::2] + + p1_action, p1_logproba, _, _, p1_vs = agent.get_action_and_value( + p1_obs, envs=envs, invalid_action_masks=p1_mask, device=device + ) + p2_action, p2_logproba, _, _, p2_vs = agent2.get_action_and_value( + p2_obs, envs=envs, invalid_action_masks=p2_mask, device=device + ) + action = torch.zeros((args.num_envs, p2_action.shape[1], p2_action.shape[2])) + action[::2] = p1_action + action[1::2] = p2_action + + # action, logproba, _, _, vs = agent.get_action_and_value( + # next_obs, envs=envs, invalid_action_masks=invalid_action_masks[step], device=device + # ) + values[step][::2] = p1_vs.flatten() + values[step][1::2] = p2_vs.flatten() + + actions[step] = action + logprobs[step][::2] = p1_logproba + logprobs[step][1::2] = p2_logproba + try: + next_obs, rs, ds, infos = envs.step(action.cpu().numpy().reshape(envs.num_envs, -1)) + next_obs = torch.Tensor(next_obs).to(device) + except Exception as e: + e.printStackTrace() + raise + rewards[step], next_done = torch.Tensor(rs).to(device), torch.Tensor(ds).to(device) + + # xxx(okachaiev): this only works for a single opponent + if ds[0]: + p1_win_loss = infos[0]['raw_rewards'][0] + if p1_win_loss == 1.: + wins += 1 + elif p1_win_loss == -1.: + losses += 1 + else: + draws += 1 + + for info in infos: + if "episode" in info.keys(): + print(f"global_step={global_step}, episodic_return={info['episode']['r']}") + writer.add_scalar("charts/episodic_return", info["episode"]["r"], global_step) + writer.add_scalar("charts/episodic_length", info["episode"]["l"], global_step) + for key in info["microrts_stats"]: + writer.add_scalar(f"charts/episodic_return/{key}", info["microrts_stats"][key], global_step) + break + + # bootstrap reward if not done. reached the batch limit + with torch.no_grad(): + last_value = agent.get_value(next_obs.to(device)).reshape(1, -1) + if args.gae: + advantages = torch.zeros_like(rewards).to(device) + lastgaelam = 0 + for t in reversed(range(args.num_steps)): + if t == args.num_steps - 1: + nextnonterminal = 1.0 - next_done + nextvalues = last_value + else: + nextnonterminal = 1.0 - dones[t + 1] + nextvalues = values[t + 1] + delta = rewards[t] + args.gamma * nextvalues * nextnonterminal - values[t] + advantages[t] = lastgaelam = delta + args.gamma * args.gae_lambda * nextnonterminal * lastgaelam + returns = advantages + values + else: + returns = torch.zeros_like(rewards).to(device) + for t in reversed(range(args.num_steps)): + if t == args.num_steps - 1: + nextnonterminal = 1.0 - next_done + next_return = last_value + else: + nextnonterminal = 1.0 - dones[t + 1] + next_return = returns[t + 1] + returns[t] = rewards[t] + args.gamma * nextnonterminal * next_return + advantages = returns - values + + # flatten the batch + b_obs = obs[:,::2].reshape((-1,) + envs.observation_space.shape) + b_logprobs = logprobs[:,::2].reshape(-1) + b_actions = actions[:,::2].reshape((-1,) + action_space_shape) + b_advantages = advantages[:,::2].reshape(-1) + b_returns = returns[:,::2].reshape(-1) + b_values = values[:,::2].reshape(-1) + b_invalid_action_masks = invalid_action_masks[:,::2].reshape((-1,) + invalid_action_shape) + + # Optimizing the policy and value network + inds = np.arange( + args.batch_size, + ) + for i_epoch_pi in range(args.update_epochs): + np.random.shuffle(inds) + for start in range(0, args.batch_size, args.minibatch_size): + end = start + args.minibatch_size + minibatch_ind = inds[start:end] + mb_advantages = b_advantages[minibatch_ind] + if args.norm_adv: + mb_advantages = (mb_advantages - mb_advantages.mean()) / (mb_advantages.std() + 1e-8) + _, newlogproba, entropy, _, new_values = agent.get_action_and_value( + b_obs[minibatch_ind], b_actions.long()[minibatch_ind], b_invalid_action_masks[minibatch_ind], envs, device + ) + ratio = (newlogproba - b_logprobs[minibatch_ind]).exp() + + # Stats + approx_kl = (b_logprobs[minibatch_ind] - newlogproba).mean() + + # Policy loss + pg_loss1 = -mb_advantages * ratio + pg_loss2 = -mb_advantages * torch.clamp(ratio, 1 - args.clip_coef, 1 + args.clip_coef) + pg_loss = torch.max(pg_loss1, pg_loss2).mean() + entropy_loss = entropy.mean() + + # Value loss + new_values = new_values.view(-1) + if args.clip_vloss: + v_loss_unclipped = (new_values - b_returns[minibatch_ind]) ** 2 + v_clipped = b_values[minibatch_ind] + torch.clamp( + new_values - b_values[minibatch_ind], -args.clip_coef, args.clip_coef + ) + v_loss_clipped = (v_clipped - b_returns[minibatch_ind]) ** 2 + v_loss_max = torch.max(v_loss_unclipped, v_loss_clipped) + v_loss = 0.5 * v_loss_max.mean() + else: + v_loss = 0.5 * ((new_values - b_returns[minibatch_ind]) ** 2) + + loss = pg_loss - args.ent_coef * entropy_loss + v_loss * args.vf_coef + + optimizer.zero_grad() + loss.backward() + nn.utils.clip_grad_norm_(agent.parameters(), args.max_grad_norm) + optimizer.step() + + if (update - 1) % args.save_frequency == 0: + if agent_path is not None: + torch.save(agent.state_dict(), agent_path) + + if not os.path.exists(f"models/{experiment_name}"): + os.makedirs(f"models/{experiment_name}") + torch.save(agent.state_dict(), f"models/{experiment_name}/agent.pt") + torch.save(agent.state_dict(), f"models/{experiment_name}/{global_step}.pt") + if args.prod_mode: + wandb.save(f"models/{experiment_name}/agent.pt", base_path=f"models/{experiment_name}", policy="now") + + # TRY NOT TO MODIFY: record rewards for plotting purposes + writer.add_scalar("charts/learning_rate", optimizer.param_groups[0]["lr"], global_step) + writer.add_scalar("charts/update", update, global_step) + writer.add_scalar("losses/value_loss", v_loss.item(), global_step) + writer.add_scalar("losses/policy_loss", pg_loss.item(), global_step) + writer.add_scalar("losses/entropy", entropy.mean().item(), global_step) + writer.add_scalar("losses/approx_kl", approx_kl.item(), global_step) + if args.kle_stop or args.kle_rollback: + writer.add_scalar("debug/pg_stop_iter", i_epoch_pi, global_step) + writer.add_scalar("charts/sps", int(global_step / (time.time() - start_time)), global_step) + print("SPS:", int(global_step / (time.time() - start_time))) + + if agent_path is not None: + torch.save(agent.state_dict(), agent_path) + + writer.close() + + return wins, losses, draws, {} + +if __name__ == "__main__": + args = parse_args() + train(args, opponent_path="league_v2/eval/agent_sota.pt") \ No newline at end of file diff --git a/experiments/run_league.py b/experiments/run_league.py new file mode 100644 index 00000000..4b4f64fa --- /dev/null +++ b/experiments/run_league.py @@ -0,0 +1,767 @@ +import argparse +from concurrent.futures import ProcessPoolExecutor +import concurrent.futures +from collections import defaultdict, deque, OrderedDict +from dataclasses import dataclass +from enum import Enum +from itertools import combinations +import importlib +import multiprocessing as mp +import numpy as np +from operator import itemgetter +import os.path +import pandas as pd +from pathlib import Path +import pandas as pd +import pickle +import random +import signal +import shutil +import time +from trueskill import Rating, quality_1vs1, rate_1vs1 +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union +import yaml + + +def init_worker(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + + +def prepare_downstream_args(args, downstream_name: str): + parser = argparse.ArgumentParser() + downstream_args = parser.parse_args([]) + downstream_args.__dict__.update(args.config[downstream_name]["args"]) + return downstream_args + + +def load_entrypoint(args, entrypoint_name: str): + module, fn = args.config[entrypoint_name]["entrypoint"].split(":") + return getattr(importlib.import_module(module), fn) + + +def load_algorithms(args) -> Tuple[Callable, Callable, Callable, Callable]: + matchmaking_algo = args.config["matchmaking"]["algorithm"] + # xxx(okachaiev): enum? preset classes? + if matchmaking_algo == "alphastar": + league_pick_opponent = alphstar_pick_opponent + elif matchmaking_algo == "openfive": + league_pick_opponent = openfive_pick_opponent + elif matchmaking_algo == "custom": + league_pick_opponent = load_entrypoint(args, "matchmaking") + else: + raise ValueError("Unsupported matchmaking algorithm") + + archive_algo = args.config["archive"]["algorithm"] + if archive_algo == "alphastar": + league_requires_archival = alphastar_requires_archival + elif archive_algo == "openfive": + league_requires_archival = openfive_requires_archival + elif archive_algo == "custom": + league_requires_archival = load_entrypoint(args, "archive") + else: + raise ValueError("Unsupported archival algorithm") + + # xxx(okachaiev): add flexiblity here in future + league_requires_evaluation = league_requires_archival + + bootstrap_algo = args.config["matchmaking"]["bootstrap"] + if bootstrap_algo == "random": + league_bootstrap_opponent = random_bootstrap_opponent + elif bootstrap_algo == "none": + # xxx(okachaiev): as of now, this option makes no sense because + # a new opponent is picked up only when previous game is finished + # thus bootstrapping ensures that matchmaking is invoked + # i should use normal "pick opponent" when bootstrap is skipped + league_bootstrap_opponent = noop_bootstrap_opponent + else: + raise ValueError("Unsupported bootstrap algorithm") + + return (league_bootstrap_opponent, league_pick_opponent, league_requires_archival, league_requires_evaluation) + + +def parse_args(): + parser = argparse.ArgumentParser() + # xxx(okachaiev): find balance for what goes into config file vs. what is defined here + # i think things like folder, initial_agents, etc should be in the script + # annd file should only be used as a set of "hyperparams" for the league and it's structure + parser.add_argument("--config-file", type=Path, required=True) + parser.add_argument("--resume", type=lambda x: bool(strtobool(x)), default=False, nargs="?", const=True) + + args = parser.parse_args() + assert args.config_file.exists() + # xxx(okachaiev): I need to find a better way to manage args + configuration file + with args.config_file.open("r") as f: + args.config = yaml.safe_load(f) + + args.checkpoints_folder = Path(args.config["league"]["folder"]).joinpath("checkpoints") + args.checkpoints_folder.mkdir(exist_ok=True) + args.agents_folder = Path(args.config["league"]["folder"]).joinpath("agents") + args.agents_folder.mkdir(exist_ok=True) + + args.train_entrypoint = load_entrypoint(args, "train") + args.evaluate_entrypoint = load_entrypoint(args, "evaluate") + + return args + + +def run_train_loop(args, match): + train_args = prepare_downstream_args(args, "train") + # xxx(okachaiev): I should probably create a copy of args namespace here + train_args.seed = np.random.randint(0, 1_000_000) + (wins, losses, draws), info = args.train_entrypoint(train_args, str(match.p1), str(match.p2)) + match_result = MatchResult.for_match(match, (wins, losses, draws), info) + + # xxx(okachaiev): good logger would be nice to have + print(f"Finished match up {match.p1} vs. {match.p2}: {match_result.payoff()}") + + return match_result + + +def find_reference_opponents(args): + # xxx(okachaiev): what about PvE agents? + # xxx(okachaiev): definitely need to be more flexible here, e.g. SB3 produces .zip files + reference_opponent_path = list(Path(args.config["evaluate"]["reference_agents_folder"]).glob("*.pt")) + return [(p.stem, str(p)) for p in reference_opponent_path] + + +def prepare_eval_reference(args): + eval_args = prepare_downstream_args(args, "evaluate") + + # if leaderboard is there and not forced, no need to run + # xxx(okachaiev): ideally this should be "additive", e.g. when min num games + # is more than was actually played, we can just play more games. or if a + # new agent was added + leaderboard_path = Path(args.config["evaluate"]["reference_agents_folder"]).joinpath("leaderboard.csv") + if leaderboard_path.exists(): + print("Evaluation leaderboard exists, bailing out") + print(pd.read_csv(leaderboard_path)) + return + + reference_opponents = find_reference_opponents(args) + assert len(reference_opponents) > 0, "No reference agents found" + + # xxx(okachaiev): should have configuration for start mu/var + mmr = defaultdict(lambda: (0, Rating())) + + # xxx(okachaiev): algo should be pluggable, with good presets + pairs = list(combinations(reference_opponents, 2)) + for _ in range(args.config["evaluate"]["matches_per_opponent"]): + # xxx(okachaiev): i can use joblib to run in parallel + # we don't need to run MMR eval immediately after the game finished + # as we do full round-robin anyways + np.random.shuffle(pairs) + for (p1_name, p1_path), (p2_name, p2_path) in pairs: + (p1_num_games, p1_rating), (p2_num_games, p2_rating) = mmr[p1_name], mmr[p2_name] + + print(f"Playing {p1_name} vs. {p2_name}") + + (wins, losses, draws) = args.evaluate_entrypoint(eval_args, p1_path, p2_path) + + print(f"Evaluation for {p1_name} vs. {p2_name}: {(wins, losses, draws)}") + + num_games = wins + losses + draws + + if wins > 0: + for _ in range(wins): + p1_rating, p2_rating = rate_1vs1( + p1_rating, p2_rating, drawn=False + ) + + if losses > 0: + for _ in range(losses): + p2_rating, p1_rating = rate_1vs1( + p2_rating, p1_rating, drawn=False + ) + + if draws > 0: + for _ in range(draws): + p1_rating, p2_rating = rate_1vs1( + p1_rating, p2_rating, drawn=True + ) + + mmr[p1_name] = (p1_num_games+num_games, p1_rating) + mmr[p2_name] = (p2_num_games+num_games, p2_rating) + + print(f"New ratings {p1_name}={p1_rating} vs. {p2_name}={p2_rating}") + + rows = [(name, num_games, rating.mu, rating.sigma) for name, (num_games, rating) in mmr.items()] + rows.sort(key=itemgetter(2), reverse=True) + + df = pd.DataFrame(rows, columns=("name", "num_games", "mu", "sigma")) + df.to_csv(leaderboard_path) + + print(df) + + +@dataclass +class EvaluationResult: + p1: str + mmr: Rating + +# xxx(okachaiev): have to update this to respect flexibility with MMR algos +def run_eval_loop(args, player, shutdown_flag): + eval_args = prepare_downstream_args(args, "evaluate") + + leaderboard_file = Path(args.config["evaluate"]["reference_agents_folder"]).joinpath("leaderboard.csv") + + assert leaderboard_file.exists(), "MMR evaluation for reference agents should be provided" + + leaderboard = pd.read_csv(leaderboard_file) + reference_opponents = find_reference_opponents(args) + + assert set(leaderboard.name) == set([name for name, _ in reference_opponents]), "All reference agents have to be evaluated" + + mmr = { + name: Rating(mu, sigma) + for (name, mu, sigma) + in leaderboard[["name", "mu", "sigma"]].to_numpy().tolist() + } + current_rating = Rating() + + # one-vs.-all strategy, this one should be pluggable + for _ in range(args.config["evaluate"]["games_per_opponent"]): + np.random.shuffle(reference_opponents) + for (opponent_name, opponent_path) in reference_opponents: + # the main process got interrupted + if shutdown_flag.is_set(): return None + + # xxx(okachaiev): API for running batch eval (with multiple opponents) + (wins, losses, draws) = args.evaluate_entrypoint(eval_args, player.save_path, opponent_path) + opponent_rating = mmr[opponent_name] + + # xxx(okachaiev): code duplication + if wins > 0: + for _ in range(wins): + current_rating, opponent_rating = rate_1vs1( + current_rating, opponent_rating, drawn=False + ) + + if losses > 0: + for _ in range(losses): + opponent_rating, current_rating = rate_1vs1( + opponent_rating, current_rating, drawn=False + ) + + if draws > 0: + for _ in range(draws): + current_rating, opponent_rating = rate_1vs1( + current_rating, opponent_rating, drawn=True + ) + + print(f"Trueskill evaluation for {player.save_path} is {current_rating}") + + # xxx(okachaiev): do we need to save it back???? + # it seems like overwrite is only necessary when we add new reference agents, + # otherwise MMR has to stay the same + + return EvaluationResult(player.save_path, current_rating) + + +# xxx(okachaiev): should this one be named "Outcome"??? +@dataclass +class Payoff: + wins: int + draws: int + losses: int + + @classmethod + def empty(cls) -> "Payoff": + return cls(0, 0, 0) + + @property + def num_games(self): + return self.wins + self.draws + self.losses + + @property + def winrate(self): + if self.num_games == 0: return 0.5 + return (self.wins + 0.5*self.draws) / self.num_games + + def __add__(self, other): + return self.__class__(self.wins + other.wins, self.draws + other.draws, self.losses + other.losses) + + def __iadd__(self, other): + self.wins += other.wins + self.draws += other.draws + self.losses += other.losses + return self + + def __neg__(self): + return self.__class__(self.losses, self.draws, self.wins) + + +# xxx(okachaiev): using Enum here is extremely limited, unfortunately +class PlayerBracket(Enum): + MAIN_PLAYER = 0 + MAIN_EXPLOITER = 1 + LEAGUE_EXPLOITER = 2 + ARCHIVED = 3 + + @classmethod + def from_str(cls, name: str) -> "PlayerBracket": + name = name.lower() + for opt in cls: + if opt.name.lower() == name: + return opt + raise ValueError("Unknown player bracket") + + +@dataclass +class Player: + bracket: PlayerBracket + save_path: str + # xxx(okachaiev): name seems more human-like than id + name: str + parent: Optional["Player"] = None + + @property + def learner(self): + return self.bracket != PlayerBracket.ARCHIVED + + # xxx(okachaiev): replace with fancy random generated names + @staticmethod + def _generate_name() -> str: + t = str(int(time.time())) + r = random.randint(0, 10_000) + return f"{t}-{r:05d}" + + @staticmethod + def _prepare_storage(bracket: PlayerBracket, save_dir: Path, player_name: str) -> Path: + save_path = save_dir.joinpath(bracket.name.lower()) + save_path.mkdir(exist_ok=True, parents=True) + return save_path.joinpath(f"{player_name}.pt") + + @classmethod + def from_model(cls, agent_model_path: str, bracket: PlayerBracket, save_dir: Path) -> "Player": + player_name = Player._generate_name() + save_path = Player._prepare_storage(bracket, save_dir, player_name) + shutil.copyfile(agent_model_path, save_path) + return cls(bracket, str(save_path), player_name) + + def archive(self, save_dir: Path): + new_player_name = Player._generate_name() + save_path = Player._prepare_storage(PlayerBracket.ARCHIVED, save_dir, new_player_name) + shutil.copyfile(self.save_path, save_path) + return self.__class__(PlayerBracket.ARCHIVED, str(save_path), new_player_name, self) + + +@dataclass +class Match: + match_id: str + p1: str + p2: str + seed: int + + @classmethod + def for_players(cls, p1: Player, p2: Player): + match_id = f"{int(time.time())}-{random.randint(0, 99):03d}" + return cls(match_id, p1.save_path, p2.save_path, np.random.randint(0, 1_000_000)) + + +@dataclass +class MatchResult: + p1: str + p2: str + wins: int + losses: int + draws: int + info: Optional[Dict[str, Any]] = None + + @classmethod + def for_match( + cls, + match: Match, + result: Optional[Tuple[int, int, int]] = None, + info: Optional[Dict[str, Any]] = None + ) -> "MatchResult": + wins, losses, draws = (0, 0, 0) if result is None else result + return cls(match.p1, match.p2, wins, losses, draws, info) + + def payoff(self) -> Payoff: + return Payoff(self.wins, self.draws, self.losses) + + +# xxx(okachaiev): i also need to include decay +# xxx(okachaiev): for checkpointing/display i better track MMR here as well +class PayoffTable: + + def __init__(self): + # xxx(okachaiev): if i keep max players, i can setup matrix as np array + # i bet that would work much faster because of nice mem layout + self.payoffs = defaultdict(Payoff.empty) + self.info = defaultdict(lambda: deque(maxlen=100)) + self.players = OrderedDict() + self.player_attrs = defaultdict(dict) + + def add_player(self, new_player: Player) -> None: + self.players[str(new_player.save_path)] = new_player + + def get_attr(self, player: Union[Player, str], name: str, default_value: Optional[Any] = None) -> Optional[Any]: + player = player if isinstance(player, str) else str(player.save_path) + return self.player_attrs[player].get(name, default_value) + + def set_attr(self, player: Union[str, Player], name: str, value: Any) -> None: + player = player if isinstance(player, str) else str(player.save_path) + self.player_attrs[player][name] = value + + def update(self, match_result: MatchResult): + self.payoffs[match_result.p1, match_result.p2] += match_result.payoff() + self.payoffs[match_result.p2, match_result.p1] += -match_result.payoff() + self.info[match_result.p1].append(match_result.info) + # xxx(okachaiev): precompute winrates???? + + def winrate(self, p1, p2) -> float: + pid1 = str(p1.save_path) if isinstance(p1, Player) else p1 + pid2 = str(p2.save_path) if isinstance(p2, Player) else p2 + return self.payoffs[pid1, pid2].winrate + + def filter_players( + self, + bracket: Optional[PlayerBracket] = None, + # xxx(okachaiev): filtering by parent should be definitely done + # differently. the only one bracket that can have parent is ARCHIVED + parent: Optional[Union[Player, List[Player]]] = None + ) -> List[Player]: + opponents = [] + if parent is not None: + parent = set([p.save_path for p in parent]) if isinstance(parent, list) else set([parent.save_path]) + for player in self.players.values(): + if bracket is not None and player.bracket != bracket: + continue + if parent is not None and (player.parent is None or player.parent.save_path not in parent): + continue + opponents.append(player) + return opponents + + # xxx(okachaiev): this API could be easily merged with "winrates" call + def calculate_winrates(self, player: Player, opponents: Iterator[Player]) -> np.ndarray: + return np.array([self.winrate(player, opponent) for opponent in opponents]) + + def to_pandas(self): + all_players = list(self.players.keys()) + rows = [] + for player_name in all_players: + winrates = self.calculate_winrates(player_name, all_players) + num_games = [self.payoffs[player_name, p2].num_games for p2 in all_players] + total_games = sum(num_games) + avg_winrate = 0.5 + if total_games > 0: + avg_winrate = np.average(winrates, weights=num_games) + steps = self.get_attr(player_name, "steps_since_last_archive", 0) + mmr = self.get_attr(player_name, "mmr", None) + mmr = 0.0 if mmr is None else mmr.mu + winrates = dict(enumerate(winrates)) + winrates.update(dict(name=player_name, num_games=total_games, mean=avg_winrate, train=steps, mmr=mmr)) + rows.append(winrates) + static_columns = ["name", "train", "num_games", "mmr", "mean"] + return pd.DataFrame(rows, columns=static_columns+list(range(len(all_players)))) + + def __getstate__(self): + return dict( + payoffs=self.__dict__["payoffs"], + players=self.__dict__["players"], + player_attrs=self.__dict__["player_attrs"] + ) + + def __setstate__(self, attrs): + self.__dict__.update(attrs) + self.info = defaultdict(lambda: deque(maxlen=100)) + + +class SelfplayBranch(Enum): + ARCHIVED = 0 + VERIFY = 1 + NORMAL = 2 + + +def choice(options: List[Player], p: np.ndarray) -> Optional[Player]: + if len(options) == 0: return None + if len(p) == 1: return options[0] + return np.random.choice(options, p=p/np.linalg.norm(p, ord=1)) + + +def remove_monotonic_suffix(winrates: np.ndarray) -> np.ndarray: + suffix = np.arange(1, len(winrates))[np.diff(winrates) >= 0] + return winrates[:suffix.max()+1] if len(suffix) else winrates + + +def alphstar_pick_opponent(payoff_table: PayoffTable, player: Player) -> Optional[Player]: + """ + This is the main league workflow. This function is executed after + each finished match up. It's responsible for finding the next match up + (opponents) that need to be executed. + + Purposefully keep this as a single function to have visibility on + how the league works. + + The AlphaStar paper that describes different players in the league: + https://www.nature.com/articles/s41586-019-1724-z + + Pseudocode for AlphaStar paper: https://github.com/chengyu2/learning_alpha_star + The most detailed explanation of branching logic could be found here: + https://github.com/chengyu2/learning_alpha_star/blob/master/multiagent.py + + More on population-based RL: https://arxiv.org/pdf/1807.01281.pdf + """ + if player.bracket == PlayerBracket.ARCHIVED: + # ARCHIVED players do not play as challengers, so this should not happen + return None + elif player.bracket == PlayerBracket.MAIN_EXPLOITER: + potential_opponents = payoff_table.filter_players(bracket=PlayerBracket.MAIN_PLAYER) + opponent = np.random.choice(potential_opponents) + if payoff_table.winrate(player, opponent) > 0.1: + return opponent + prev_opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED, parent=opponent) + winrates = payoff_table.calculate_winrates(player, prev_opponents) + return choice(prev_opponents, winrates*(1-winrates)) + elif player.bracket == PlayerBracket.LEAGUE_EXPLOITER: + potential_opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED) + winrates = payoff_table.calculate_winrates(player, potential_opponents) + return choice(potential_opponents, np.minimum(0.5, 1-winrates)) + elif player.bracket == PlayerBracket.MAIN_PLAYER: + # sample multinomial to decide between playing against archived or main player + r = np.random.choice([SelfplayBranch.ARCHIVED, SelfplayBranch.VERIFY, SelfplayBranch.NORMAL], p=[0.5,0.15,0.35]) + if r == SelfplayBranch.ARCHIVED: + # matchup against archived player + potential_opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED) + winrates = payoff_table.calculate_winrates(player, potential_opponents) + return choice(potential_opponents, (1-winrates)**2) + else: + # matchup against another main player + potential_opponents = payoff_table.filter_players(bracket=PlayerBracket.MAIN_PLAYER) + opponent = np.random.choice(potential_opponents) + if r == SelfplayBranch.VERIFY: + # check out archived exploiters + exploiters = payoff_table.filter_players(bracket=PlayerBracket.MAIN_EXPLOITER) + prev_exploiters = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED, parent=exploiters) + winrates = payoff_table.calculate_winrates(player, prev_exploiters) + if len(winrates) and winrates.min() < 0.3: + return choice(prev_exploiters, (1-winrates)**2) + # previous versions of the opponent + prev_opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED, parent=opponent) + winrates = payoff_table.calculate_winrates(player, prev_opponents) + winrates = remove_monotonic_suffix(winrates) + if len(winrates) and winrates.min() < 0.7: + return choice(prev_opponents, winrates*(1-winrates)) + else: + return None + elif payoff_table.winrate(player, opponent) > 0.3: + # main player, opponent is not scary + return opponent + else: + # main player, opponent is a bit hard + potential_opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED, parent=opponent) + winrates = payoff_table.calculate_winrates(player, potential_opponents) + return choice(potential_opponents, winrates*(1-winrates)) + + +# xxx(okachaiev): i might be the case i need to merge "pick_opponent" and "requires_archival" +# into a sigle function. e.g. "on_step". though i don't want to lose clarity by doing so +def alphastar_requires_archival(args, payoff_table: PayoffTable, player: Player) -> bool: + """ + Archival happens based on the winrate againts different set of opponents. + """ + if player.bracket == PlayerBracket.ARCHIVED: return False + steps = payoff_table.get_attr(player, "steps_since_last_archive", 0) + if steps < args.config["archive"]["args"]["min_steps"]: return False + if steps > args.config["archive"]["args"]["max_steps"]: return True + if player.bracket == PlayerBracket.MAIN_PLAYER: + opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED) + elif player.bracket == PlayerBracket.MAIN_EXPLOITER: + opponents = payoff_table.filter_players(bracket=PlayerBracket.MAIN_PLAYER) + elif player.bracket == PlayerBracket.LEAGUE_EXPLOITER: + opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED) + winrates = payoff_table.calculate_winrates(player, opponents) + return len(winrates) and winrates.min() > args.config["archive"]["args"]["winrate_threshold"] + + + +def openfive_pick_opponent(payoff_table: PayoffTable, player: Player) -> Optional[Player]: + """ + The agent has 80% chance to play against itself, and 20% chance to play against past self. + """ + if player.bracket == PlayerBracket.ARCHIVED: + # ARCHIVED players do not play as challengers, so this should not happen + return None + selfplay = np.random.rand() > 0.8 + if selfplay: return player + opponents = payoff_table.filter_players(bracket=PlayerBracket.ARCHIVED, parent=player) + if len(opponents): + return choice(opponents, np.ones(len(opponents))) + + +def openfive_requires_archival(args, payoff_table: PayoffTable, player: Player) -> bool: + """ + Archive each learning agent right after num_steps of training. + """ + if player.bracket == PlayerBracket.ARCHIVED: return False + steps = payoff_table.get_attr(player, "steps_since_last_archive", 0) + return steps >= args.config["archive"]["args"]["num_steps"] + + +def random_bootstrap_opponent(args, player: Player, initial_players: List[Player]) -> Optional[List[Player]]: + if not player.learner: return None + # xxx(okachaiev): should the player itself being masked? + return np.random.choice(initial_players, args.config["matchmaking"]["bootstrap_args"]["num_opponents"]) + + +def noop_bootstrap_opponent(args, player: Player, initial_players: List[Player]) -> Optional[List[Player]]: + return None + + +def league_checkpoint(args, payoff_table: PayoffTable) -> None: + existing_files = list(args.checkpoints_folder.glob("*.pcl")) + # xxx(okachaiev): should prefix be customizable? + checkpoint_file = f"league-{int(time.time())}-{random.randint(1_000,10_000)}.pcl" + checkpoint_path = args.checkpoints_folder.joinpath(checkpoint_file) + + print(f"Checkpointing league into {checkpoint_path}") + + with checkpoint_path.open("wb") as f: + pickle.dump(payoff_table, f, pickle.HIGHEST_PROTOCOL) + for prev_checkpoint in existing_files: + prev_checkpoint.unlink() + + +def league_resume_from_checkpoint(args) -> Optional[PayoffTable]: + if not args.checkpoints_folder.exists(): return None + checkpoints = list(args.checkpoints_folder.glob("*.pcl")) + if not checkpoints: return None + + print(f"Resuming operations from checkpoint {checkpoints[0]}") + + with checkpoints[0].open("rb") as f: + return pickle.load(f) + + +if __name__ == "__main__": + args = parse_args() + + ( + league_bootstrap_opponent, + league_pick_opponent, + league_requires_archival, + league_requires_evaluation + ) = load_algorithms(args) + + # before doing the training, let's make sure that evaluation league is ready + prepare_eval_reference(args) + + # load from checkpoint (if any) + payoff_table = league_resume_from_checkpoint(args) if args.resume else None + if payoff_table is None: + payoff_table = PayoffTable() + + # load league players, starting from bootstrap + for agent_model_path in args.config["population"]["initial_agents"]: + for group in args.config["population"]["structure"]: + for _ in range(group["num_agents"]): + bracket = PlayerBracket.from_str(group["group"]) + player = Player.from_model(agent_model_path, bracket=bracket, save_dir=args.agents_folder) + payoff_table.add_player(player) + if group["init_archive"]: + payoff_table.add_player(player.archive(save_dir=args.agents_folder)) + + ctx = mp.get_context("forkserver") + executor = ProcessPoolExecutor(max_workers=args.config["train"]["num_workers"], mp_context=ctx, initializer=init_worker) + eval_executor = ProcessPoolExecutor(max_workers=args.config["evaluate"]["num_workers"], mp_context=ctx, initializer=init_worker) + manager = mp.Manager() + shutdown_flag = manager.Event() + + def sigint_handler(signal, frame): + print("Interrupted, stopping executors") + # xxx(okachaiev): this won't kill already running training processes + shutdown_flag.set() + executor.shutdown(cancel_futures=True) + eval_executor.shutdown(cancel_futures=True) + exit(0) + + signal.signal(signal.SIGINT, sigint_handler) + signal.signal(signal.SIGTERM, sigint_handler) + + # league loop + print(f"Staring league with {len(payoff_table.players)} initial players") + + initial_players = list(payoff_table.players.values()) + # xxx(okachaiev): using dicts for mutable values is ... weird + # would be much better if I can put it into a "League" class or something + num_scheduled_matches, played_matches = dict(value=0), dict(value=0) + scheduled_futures = set() + + def process_match_result(future, scheduled_futures): + if future.cancelled(): return + + match_result = future.result() + if match_result is None: + return + + if isinstance(match_result, EvaluationResult): + payoff_table.set_attr(match_result.p1, "mmr", match_result.mmr) + league_checkpoint(args, payoff_table) + return + + played_matches["value"] += 1 + payoff_table.update(match_result) + + # visualize into terminal + # xxx(okachaiev): use writer to update TensorBoard/W&B + with pd.option_context("display.float_format", "{:,.4f}".format): + print(payoff_table.to_pandas()) + + winrate = payoff_table.winrate(match_result.p1, match_result.p2) + + print(f"Updated winrate {match_result.p1} vs. {match_result.p2}: {winrate}") + + player = payoff_table.players[match_result.p1] + + # needs evaluation? + if league_requires_evaluation(args, payoff_table, player): + print(f"Scheduled evaluation for {player.save_path}...") + scheduled_futures.add(eval_executor.submit(run_eval_loop, args, player, shutdown_flag)) + + # ready to be archived? + if league_requires_archival(args, payoff_table, player): + print(f"Archiving {player}...") + payoff_table.add_player(player.archive(save_dir=args.agents_folder)) + payoff_table.set_attr(player, "steps_since_last_archive", 0) + else: + steps = payoff_table.get_attr(player, "steps_since_last_archive", 0) + steps += args.config["train"]["args"]["total_timesteps"] + payoff_table.set_attr(player, "steps_since_last_archive", steps) + + league_checkpoint(args, payoff_table) + + # xxx(okachaiev): if we only produce 1 game or None, number of game + # will decrease overtime :thinking: in this case i need to pick + # a random pair of agents to match up + next_opponent = league_pick_opponent(payoff_table, player) + if next_opponent: + schedule(Match.for_players(player, next_opponent), scheduled_futures) + + def schedule(match, scheduled_matches): + if num_scheduled_matches["value"] >= args.config["league"]["max_matches"]: return + + print(f"Queued match {match.p1} vs. {match.p2}") + + future = executor.submit(run_train_loop, args, match) + scheduled_matches.add(future) + num_scheduled_matches["value"] += 1 + + # xxx(okachaiev): when resumed from checkpoint, do I need to run initial games? + # i probably need to keep flag that initialization is finished and we should switch + # to a normal "pick opponent" procedure + for player in initial_players: + # xxx(okachaiev): is there a need to have option to get all players vs.all players API? + opponents = league_bootstrap_opponent(args, player, initial_players) + if opponents is not None: + for opponent in opponents: + schedule(Match.for_players(player, opponent), scheduled_futures) + + while len(scheduled_futures): + done, scheduled_futures = concurrent.futures.wait(scheduled_futures, return_when=concurrent.futures.FIRST_COMPLETED) + for match_result in done: + process_match_result(match_result, scheduled_futures) + + shutdown_flag.set() + executor.shutdown(wait=True, cancel_futures=False) + eval_executor.shutdown(wait=True, cancel_futures=False) \ No newline at end of file