Skip to content

Commit

Permalink
Merge pull request #53 from alan-turing-institute/test-update
Browse files Browse the repository at this point in the history
Better tests, better `__main__.py`, no duplicates on teams
  • Loading branch information
phinate authored Jun 28, 2023
2 parents 94233d1 + b61ed71 commit f7dd223
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 132 deletions.
97 changes: 92 additions & 5 deletions src/p2lab/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,58 @@

import argparse
import asyncio
from pathlib import Path

import numpy as np

from p2lab.genetic.genetic import genetic_algorithm
from p2lab.genetic.operations import (
build_crossover_fn,
locus_swap,
sample_swap,
slot_swap,
)
from p2lab.pokemon.premade import gen_1_pokemon
from p2lab.pokemon.teams import generate_teams, import_pool


async def main_loop(num_teams, team_size, num_generations, unique):
async def main_loop(
num_teams,
team_size,
num_generations,
battles_per_match,
unique,
crossover,
p1,
p2,
write_every,
write_path,
):
if write_path is not None:
write_path = Path(write_path)
if not write_path.exists():
write_path.mkdir()
# generate the pool
pool = import_pool(gen_1_pokemon())
seed_teams = generate_teams(pool, num_teams, team_size, unique=unique)
# crossover_fn = build_crossover_fn(locus_swap, locus=0)
function_map = {
"sample": sample_swap,
"slot": slot_swap,
"locus": locus_swap,
}
crossover_fn = (
build_crossover_fn(function_map[crossover]) if crossover is not None else None
)

# log the parameters
print("Running genetic algorithm with the following parameters:")
print(f"Number of teams: {num_teams}")
print(f"Team size: {team_size}")
print(f"Number of generations: {num_generations}")
print(f"Unique teams: {unique}")
print(f"Crossover: {crossover if crossover is not None else 'none'}")
print(f"Player 1: {p1}")
print(f"Player 2: {p2}")
# run the genetic algorithm
teams, fitnesses = await genetic_algorithm(
pokemon_pool=pool,
Expand All @@ -23,8 +62,14 @@ async def main_loop(num_teams, team_size, num_generations, unique):
team_size=team_size,
num_generations=num_generations,
progress_bars=True,
mutate_with_fitness=True,
mutate_k=1,
mutate_with_fitness=crossover_fn is None,
crossover_fn=crossover_fn,
mutate_k=team_size - 1,
player_1_name=p1,
player_2_name=p2,
battles_per_match=battles_per_match,
write_every=write_every,
write_path=write_path,
)

print("Best team:")
Expand Down Expand Up @@ -54,7 +99,7 @@ def parse_args():
default=10,
)
parser.add_argument(
"--team-size", help="Number of pokemon per team (max 6)", type=int, default=2
"--team-size", help="Number of pokemon per team (max 6)", type=int, default=3
)
parser.add_argument(
"--teams",
Expand All @@ -68,6 +113,42 @@ def parse_args():
help="Determines if a team can have duplicate pokemon species",
default=True,
)
parser.add_argument(
"--crossover",
help="Determines which crossover function to use",
choices=["locus", "slot", "sample"],
default=None,
)
parser.add_argument(
"--p1",
help="Name of the first player",
type=str,
default="Player 1",
)
parser.add_argument(
"--p2",
help="Name of the second player",
type=str,
default="Player 2",
)
parser.add_argument(
"--battles-per-match",
help="Number of battles per match",
type=int,
default=3,
)
parser.add_argument(
"--write-every",
help="Write every N generations",
type=int,
default=10,
)
parser.add_argument(
"--write-path",
help="Path to write to",
type=str,
default="./results",
)
return vars(parser.parse_args())


Expand All @@ -82,6 +163,12 @@ def main():
team_size=args["team_size"],
num_generations=args["generations"],
unique=args["unique"],
crossover=args["crossover"],
p1=args["p1"],
p2=args["p2"],
battles_per_match=args["battles_per_match"],
write_every=args["write_every"],
write_path=args["write_path"],
)
)

Expand Down
37 changes: 33 additions & 4 deletions src/p2lab/genetic/genetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

__all__ = ("genetic_algorithm",)

from pathlib import Path
from typing import TYPE_CHECKING, Callable

import numpy as np
from poke_env import PlayerConfiguration
from poke_env.player import SimpleHeuristicsPlayer

Expand All @@ -16,7 +18,6 @@
from p2lab.pokemon.teams import Team


# TODO: account for team size of 1
async def genetic_algorithm(
pokemon_pool: list[str], # list of all valid pokemon names
seed_teams: list[Team], # list of teams to seed the algorithm with
Expand All @@ -35,6 +36,12 @@ async def genetic_algorithm(
num_generations: int = 500,
fitness_kwargs: dict | None = None,
progress_bars: bool = True,
player_1_name: str = "Player 1",
player_2_name: str = "Player 2",
print_top_teams: bool = True,
player_log_level: int = 30,
write_every: int | None = None,
write_path: str | None = None,
) -> Team:
"""
A genetic evolution algorithm for optimising pokemon team selection.
Expand Down Expand Up @@ -88,10 +95,14 @@ async def genetic_algorithm(
matches = match_fn(seed_teams)

player_1 = SimpleHeuristicsPlayer(
PlayerConfiguration("Player 1", None), battle_format=battle_format
PlayerConfiguration(player_1_name, None),
battle_format=battle_format,
log_level=player_log_level,
)
player_2 = SimpleHeuristicsPlayer(
PlayerConfiguration("Player 2", None), battle_format=battle_format
PlayerConfiguration(player_2_name, None),
battle_format=battle_format,
log_level=player_log_level,
)

print("Generation 0:")
Expand Down Expand Up @@ -169,7 +180,25 @@ async def genetic_algorithm(
# Generate matches from list of teams
matches = match_fn(teams)

print(f"Generation {i + 1}:")
if print_top_teams:
print(f"Generation {i + 1}:")
print(f"Top team: {teams[np.argmax(fitness)].names}")
print(f"Top fitness: {fitness[np.argmax(fitness)]}")

if write_every is not None and i % write_every == 0:
if not Path.exists(Path(write_path) / Path(f"generation_{i}")):
Path.mkdir(Path(write_path) / Path(f"generation_{i}"))
sorted_teams = sorted(
teams, key=lambda x: fitness[teams.index(x)], reverse=True
)
for j, team in enumerate(sorted_teams[:5]):
team_path = (
Path(write_path)
/ Path(f"generation_{i}")
/ Path(f"team_{j}_fitness_{fitness[j]:.3f}.txt")
)
print(team)
team.to_file(team_path)

# Run simulations
results = await run_battles(
Expand Down
107 changes: 82 additions & 25 deletions src/p2lab/genetic/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def selection(
"""

# Sample indices with replacement to produce new teams + fitnesses
old_indices = list(range(num_teams))
old_indices = list(range(len(teams)))
new_indices = random.choices(old_indices, k=num_teams)

# New teams and fitness
Expand Down Expand Up @@ -230,21 +230,22 @@ def sample_swap(
team1: list[str],
team2: list[str],
num_pokemon: int,
with_replacement: bool = True,
**kwargs,
) -> tuple(list[str], list[str]):
"""
A method of performing the crossover. This method treats the pokemon
in the two teams as a population and samples from them to create two new
teams.
Can be done with or without replacement. Generally more interesting with
replacement and so defaults to with replacement.
Args:
team1: List of pokemon in team 1
team2: List of pokemon in team 2
num_pokemon: Number of pokemon in each team
with_replacement: Whether to sample with our without replacement.
Returns:
Two new teams
note: replacement is set to False to enforce the same pokemon cannot be in both teams
"""

# Population to sample from and indices
Expand All @@ -255,23 +256,37 @@ def sample_swap(
team1_indices = np.random.choice(
indices,
size=num_pokemon,
replace=with_replacement,
replace=False,
)

# For team 2, change behaviour conditional on replacement
if with_replacement:
team2_indices = list(
np.random.choice(
indices,
size=num_pokemon,
replace=with_replacement,
)
team1_pokemon = population[team1_indices]
team1_names = [p.formatted.split("|")[0] for p in team1_pokemon]

# Get indices for team 2, which are just the indices not in team 1
team2_indices = list(set(indices) - set(team1_indices))
team2_pokemon = population[team2_indices]
team2_names = [p.formatted.split("|")[0] for p in team2_pokemon]

# ensure we don't sample the same pokemon twice on either team
while len(set(team1_names)) != len(team1_names) or len(set(team2_names)) != len(
team2_names
):
print("Found duplicate pokemon, resampling...")
team1_indices = np.random.choice(
indices,
size=num_pokemon,
replace=False,
)
else:

team1_pokemon = population[team1_indices]
team1_names = [p.formatted.split("|")[0] for p in team1_pokemon]

team2_indices = list(set(indices) - set(team1_indices))
team2_pokemon = population[team2_indices]
team2_names = [p.formatted.split("|")[0] for p in team2_pokemon]

# Return teams
return list(population[team1_indices]), list(population(team2_indices))
return list(team1_pokemon), list(team2_pokemon)


### Mutation Operations
Expand All @@ -297,6 +312,7 @@ def mutate(
k: Number of team members to mutate. If set to None, this number will
be random.
"""
new_teams = []
for team in teams:
# Each team faces a random chance of mutation
if np.random.choice([True, False], size=None, p=[mutate_prob, 1 - mutate_prob]):
Expand All @@ -308,14 +324,33 @@ def mutate(
k = random.sample(range(n, num_pokemon - n), k=1)[0]

# Randomly swap k members of the team out with pokemon from the general pop
# IMPORTANT: ensure that no team has the same pokemon in it
mutate_indices = np.random.choice(range(num_pokemon), size=k, replace=False)

new_pokemon = np.random.choice(
pokemon_population, size=k, replace=True
pokemon_population,
size=k,
replace=False, # replace would create duplicates
) # open to parameterising the replace
team.pokemon[mutate_indices] = new_pokemon
# check that these new pokemon are not already in the team
names = [p.formatted.split("|")[0] for p in new_pokemon]
while any(name in team.names for name in names):
print("Found duplicate pokemon, resampling...")
new_pokemon = np.random.choice(
pokemon_population,
size=k,
replace=False, # replace would create duplicates
)
names = [p.formatted.split("|")[0] for p in new_pokemon]
# Create new team with the mutated pokemon and the rest of the team
old_pokemon = np.array(team.pokemon)[
[i for i in range(num_pokemon) if i not in mutate_indices]
]
new_team = [*new_pokemon, *old_pokemon]
new_teams.append(Team(new_team))
else:
new_teams.append(team)

return teams
return new_teams


def fitness_mutate(
Expand Down Expand Up @@ -343,6 +378,8 @@ def fitness_mutate(
k: Number of team members to mutate. If set to None, this number will
be random.
"""
new_teams = []

for index, team in enumerate(teams):
# Each team faces a random chance of mutation
if np.random.choice(
Expand All @@ -356,10 +393,30 @@ def fitness_mutate(
k = random.sample(range(n, num_pokemon - n), k=1)[0]

# Randomly swap k members of the team out with pokemon from the general pop
# IMPORTANT: ensure that no team has the same pokemon in it
mutate_indices = np.random.choice(range(num_pokemon), size=k, replace=False)
new_pokemon = np.random.choice(
pokemon_population, size=k, replace=True
pokemon_population,
size=k,
replace=False, # replace would create duplicates
) # open to parameterising the replace
team.pokemon[mutate_indices] = new_pokemon

return teams
# check that these new pokemon are not already in the team
names = [p.formatted.split("|")[0] for p in new_pokemon]
while any(name in team.names for name in names):
print("Found duplicate pokemon, resampling...")
new_pokemon = np.random.choice(
pokemon_population,
size=k,
replace=False, # replace would create duplicates
)
names = [p.formatted.split("|")[0] for p in new_pokemon]
# Create new team with the mutated pokemon and the rest of the team
old_pokemon = np.array(team.pokemon)[
[i for i in range(num_pokemon) if i not in mutate_indices]
]
new_team = [*new_pokemon, *old_pokemon]
new_teams.append(Team(new_team))
else:
new_teams.append(team)

return new_teams
Loading

0 comments on commit f7dd223

Please sign in to comment.