Skip to content

Update to pygad.py to use multiprocessing of generations #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 78 additions & 6 deletions pygad.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pickle
import time
import warnings
import concurrent.futures

class GA:

Expand Down Expand Up @@ -47,7 +48,10 @@ def __init__(self,
save_best_solutions=False,
save_solutions=False,
suppress_warnings=False,
stop_criteria=None):
stop_criteria=None,
use_multiprocess=False,
max_workers=2
):

"""
The constructor of the GA class accepts all parameters required to create an instance of the GA class. It validates such parameters.
Expand Down Expand Up @@ -105,6 +109,10 @@ def __init__(self,
allow_duplicate_genes: Added in PyGAD 2.13.0. If True, then a solution/chromosome may have duplicate gene values. If False, then each gene will have a unique value in its solution.

stop_criteria: Added in PyGAD 2.15.0. It is assigned to some criteria to stop the evolution if at least one criterion holds.

use_multiprocess: Added Dec 30, 2021: Whether or not to use concurrent.futures MultiprocessPoolExecutor to go through each chromosome of a population in a generation.

max_workers: Added Dec 30, 2021: How many cpu cores to use if "use_multiprocess" is set to "True"
"""

# If suppress_warnings is bool and its valud is False, then print warning messages.
Expand Down Expand Up @@ -869,6 +877,12 @@ def __init__(self,
self.valid_parameters = False
raise TypeError("The expected value of the 'stop_criteria' is a single string or a list/tuple/numpy.ndarray of strings but the value {stop_criteria_val} of type {stop_criteria_type} found.".format(stop_criteria_val=stop_criteria, stop_criteria_type=type(stop_criteria)))

# Added warning for multiprocessing
self.use_multiprocess = use_multiprocess
self.max_workers = max_workers
if self.use_multiprocess == False and self.max_workers != 2:
if not self.suppress_warnings: warnings.warn("You have designated a non-default number of workers, but 'use_multiprocess' is set to False. Multiprocessing will not be used in this run. Set 'use_multiprocess' to True if you want to use those workers that you set.")

# The number of completed generations.
self.generations_completed = 0

Expand Down Expand Up @@ -1131,6 +1145,54 @@ def initialize_population(self, low, high, allow_duplicate_genes, mutation_by_re
# Keeping the initial population in the initial_population attribute.
self.initial_population = self.population.copy()


def cal_pop_fitness_multiprocess(self):

"""
Calculating the fitness values of all solutions in the current population, using multiprocessing.
It returns:
-fitness: An array of the calculated fitness values.
"""

if self.valid_parameters == False:
raise ValueError("ERROR calling the cal_pop_fitness_multiprocess() method: \nPlease check the parameters passed while creating an instance of the GA class.\n")

# Create a list of len(self.population) full of negative inf's
pop_fitness = [-numpy.inf] * len(self.population)

# First, do the same check if the parent's fitness value are already calculated.
# If so, use it instead of calling the fitness function.
for sol_idx, sol in enumerate(self.population):
if not (self.last_generation_parents is None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
# Index of the parent in the parents array (self.last_generation_parents). This is not its index within the population.
parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
# Index of the parent in the population.
parent_idx = self.last_generation_parents_indices[parent_idx]
# Use the parent's index to return its pre-calculated fitness value.
fitness = self.last_generation_fitness[parent_idx]
# NOTE: this is why we create the list ahead of time, so we can make use of the sol_idx
# for fitnesses already determined
pop_fitness[sol_idx] = fitness

# Start a process pool for each population member
with concurrent.futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
# Don't do the ones we just checked above, based on the -numpy.inf in the pop_fitness list (again, using sol_idx)
# NOTE: Seeing as some fitnesses may finish quicker than others re: the process pool, I needed a way to still use
# the appropriate sol_idx for the particular chromosome being tested, and the easiest way I knew to do this was
# to have the fitness_func return the fitness, as well as the sol_idx that was passed into it. That way, regardless
# of which chromosome comes out of the fitness func first, it'll still be placed in its correct spot in the
# pop_fitness list. So, if using multiprocess, the fitness_func must "return fitness, solution_idx"
results = [executor.submit(self.fitness_func, sol, sol_idx) for sol_idx, sol in enumerate(self.population) if pop_fitness[sol_idx] != -numpy.inf]
for f in concurrent.futures.as_completed(results):
solution_fitness = f.result()[0]
solution_idx = f.result()[1]
pop_fitness[solution_idx] = solution_fitness

# Same as default
pop_fitness = numpy.array(pop_fitness)

return pop_fitness

def cal_pop_fitness(self):

"""
Expand Down Expand Up @@ -1184,7 +1246,10 @@ def run(self):
stop_run = False

# Measuring the fitness of each chromosome in the population. Save the fitness in the last_generation_fitness attribute.
self.last_generation_fitness = self.cal_pop_fitness()
if self.use_multiprocess == False:
self.last_generation_fitness = self.cal_pop_fitness()
elif self.use_multiprocess == True:
self.last_generation_fitness = self.cal_pop_fitness_multiprocess()

best_solution, best_solution_fitness, best_match_idx = self.best_solution(pop_fitness=self.last_generation_fitness)

Expand Down Expand Up @@ -1258,7 +1323,10 @@ def run(self):
self.generations_completed = generation + 1 # The generations_completed attribute holds the number of the last completed generation.

# Measuring the fitness of each chromosome in the population. Save the fitness in the last_generation_fitness attribute.
self.last_generation_fitness = self.cal_pop_fitness()
if self.use_multiprocess == False:
self.last_generation_fitness = self.cal_pop_fitness()
elif self.use_multiprocess == True:
self.last_generation_fitness = self.cal_pop_fitness_multiprocess()

best_solution, best_solution_fitness, best_match_idx = self.best_solution(pop_fitness=self.last_generation_fitness)

Expand Down Expand Up @@ -3111,8 +3179,12 @@ def best_solution(self, pop_fitness=None):

# Getting the best solution after finishing all generations.
# At first, the fitness is calculated for each solution in the final generation.
if pop_fitness is None:
pop_fitness = self.cal_pop_fitness()
if self.use_multiprocess == False:
if pop_fitness is None:
pop_fitness = self.cal_pop_fitness()
elif self.use_multiprocess == True:
if pop_fitness is None:
pop_fitness = self.cal_pop_fitness_multiprocess()
# Then return the index of that solution corresponding to the best fitness.
best_match_idx = numpy.where(pop_fitness == numpy.max(pop_fitness))[0][0]

Expand Down Expand Up @@ -3481,4 +3553,4 @@ def load(filename):
raise FileNotFoundError("Error reading the file {filename}. Please check your inputs.".format(filename=filename))
except:
raise BaseException("Error loading the file. If the file already exists, please reload all the functions previously used (e.g. fitness function).")
return ga_in
return ga_in