import logging
from collections import namedtuple
import numpy as np
from l2l import dict_to_list, list_to_dict
from l2l.optimizers.optimizer import Optimizer
logger = logging.getLogger("optimizers.evolutionstrategies")
EvolutionStrategiesParameters = namedtuple('EvolutionStrategiesParameters', [
'learning_rate',
'noise_std',
'mirrored_sampling_enabled',
'fitness_shaping_enabled',
'pop_size',
'n_iteration',
'stop_criterion',
'seed',
])
EvolutionStrategiesParameters.__doc__ = """
:param learning_rate: Learning rate
:param noise_std: Standard deviation of the step size (The step has 0 mean)
:param mirrored_sampling_enabled: Should we turn on mirrored sampling i.e. sampling both e and -e
:param fitness_shaping_enabled: Should we turn on fitness shaping i.e. using only top `fitness_shaping_ratio` to update
current individual?
:param pop_size: Number of individuals per simulation.
:param n_iteration: Number of iterations to perform
:param stop_criterion: (Optional) Stop if this fitness is reached.
:param seed: The random seed used for generating new individuals
"""
[docs]class EvolutionStrategiesOptimizer(Optimizer):
"""
Class Implementing the evolution strategies optimizer
as in: Salimans, T., Ho, J., Chen, X. & Sutskever, I. Evolution Strategies as a Scalable Alternative to
Reinforcement Learning. arXiv:1703.03864 [cs, stat] (2017).
In the pseudo code the algorithm does:
For n iterations do:
- Perturb the current individual by adding a value with 0 mean and `noise_std` standard deviation
- If mirrored sampling is enabled, also perturb the current individual by subtracting the same values that were
added in the previous step
- evaluate individuals and get fitness
- Update the fitness as
theta_{t+1} <- theta_t + alpha * sum{F_i * e_i} / (n * sigma^2)
where F_i is the fitness and e_i is the perturbation
- If fitness shaping is enabled, F_i is replaced with the utility u_i in the previous step, which is calculated as:
u_i = max(0, log(n/2 + 1) - log(k)) / sum_{k=1}^{n}{max(0, log(n/2 + 1) - log(k))} - 1 / n
As in the paper: Wierstra, D. et al. Natural Evolution Strategies. Journal of Machine Learning Research 15,
949–980 (2014).
where k and i are the indices of the individuals in descending order of fitness F_i
NOTE: This is not the most efficient implementation in terms of communication, since the new parameters are
communicated to the individuals rather than the seed as in the paper.
NOTE: Doesn't yet contain fitness shaping and mirrored sampling
:param ~l2l.utils.trajectory.Trajectory traj:
Use this trajectory to store the parameters of the specific runs. The parameters should be
initialized based on the values in `parameters`
:param optimizee_create_individual:
Function that creates a new individual. All parameters of the Individual-Dict returned should be
of numpy.float64 type
:param optimizee_fitness_weights:
Fitness weights. The fitness returned by the Optimizee is multiplied by these values (one for each
element of the fitness vector)
:param parameters:
Instance of :func:`~collections.namedtuple` :class:`.CrossEntropyParameters` containing the
parameters needed by the Optimizer
"""
def __init__(self,
traj,
optimizee_create_individual,
optimizee_fitness_weights,
parameters,
optimizee_bounding_func=None):
super().__init__(
traj,
optimizee_create_individual=optimizee_create_individual,
optimizee_fitness_weights=optimizee_fitness_weights,
parameters=parameters,
optimizee_bounding_func=optimizee_bounding_func)
self.optimizee_bounding_func = optimizee_bounding_func
if parameters.pop_size < 1:
raise Exception("pop_size needs to be greater than 0")
# The following parameters are recorded
traj.f_add_parameter('learning_rate', parameters.learning_rate, comment='Learning rate')
traj.f_add_parameter('noise_std', parameters.noise_std, comment='Standard deviation of noise')
traj.f_add_parameter(
'mirrored_sampling_enabled',
parameters.mirrored_sampling_enabled,
comment='Flag to enable mirrored sampling')
traj.f_add_parameter(
'fitness_shaping_enabled', parameters.fitness_shaping_enabled, comment='Flag to enable fitness shaping')
traj.f_add_parameter(
'pop_size', parameters.pop_size, comment='Number of minimal individuals simulated in each run')
traj.f_add_parameter('n_iteration', parameters.n_iteration, comment='Number of iterations to run')
traj.f_add_parameter(
'stop_criterion', parameters.stop_criterion, comment='Stop if best individual reaches this fitness')
traj.f_add_parameter(
'seed', np.uint32(parameters.seed), comment='Seed used for random number generation in optimizer')
self.random_state = np.random.RandomState(traj.parameters.seed)
self.current_individual_arr, self.optimizee_individual_dict_spec = dict_to_list(
self.optimizee_create_individual(), get_dict_spec=True)
noise_std_shape = np.array(parameters.noise_std).shape
assert noise_std_shape == () or noise_std_shape == self.current_individual_arr.shape
traj.f_add_derived_parameter(
'dimension',
self.current_individual_arr.shape,
comment='The dimension of the parameter space of the optimizee')
# Added a generation-wise parameter logging
traj.results.f_add_result_group(
'generation_params',
comment='This contains the optimizer parameters that are'
' common across a generation')
# The following parameters are recorded as generation parameters i.e. once per generation
self.g = 0 # the current generation
self.pop_size = parameters.pop_size # Population size is dynamic in FACE
self.best_fitness_in_run = -np.inf
self.best_individual = None
# The first iteration does not pick the values out of the Gaussian distribution. It picks randomly
# (or at-least as randomly as optimizee_create_individual creates individuals)
# Note that this array stores individuals as an np.array of floats as opposed to Individual-Dicts
# This is because this array is used within the context of the cross entropy algorithm and
# Thus needs to handle the optimizee individuals as vectors
self.current_perturbations = self._get_perturbations(traj)
current_eval_pop_arr = (self.current_individual_arr + self.current_perturbations).tolist()
self.eval_pop = [list_to_dict(ind, self.optimizee_individual_dict_spec) for ind in current_eval_pop_arr]
self.eval_pop.append(list_to_dict(self.current_individual_arr, self.optimizee_individual_dict_spec))
# Bounding function has to be applied AFTER the individual has been converted to a dict
if optimizee_bounding_func is not None:
self.eval_pop = [self.optimizee_bounding_func(ind) for ind in self.eval_pop]
self.eval_pop_arr = np.array([dict_to_list(ind) for ind in self.eval_pop])
self._expand_trajectory(traj)
def _get_perturbations(self, traj):
pop_size, noise_std, mirrored_sampling_enabled = traj.pop_size, traj.noise_std, traj.mirrored_sampling_enabled
perturbations = noise_std * self.random_state.randn(pop_size, *self.current_individual_arr.shape)
if mirrored_sampling_enabled:
return np.vstack((perturbations, -perturbations))
return perturbations
[docs] def post_process(self, traj, fitnesses_results):
"""
See :meth:`~l2l.optimizers.optimizer.Optimizer.post_process`
"""
n_iteration, stop_criterion, learning_rate, noise_std, fitness_shaping_enabled = \
traj.n_iteration, traj.stop_criterion, traj.learning_rate, traj.noise_std, traj.fitness_shaping_enabled
weighted_fitness_list = []
#**************************************************************************************************************
# Storing run-information in the trajectory
# Reading fitnesses and performing distribution update
#**************************************************************************************************************
for run_index, fitness in fitnesses_results:
# We need to convert the current run index into an ind_idx
# (index of individual within one generation)
traj.v_idx = run_index
ind_index = traj.par.ind_idx
traj.f_add_result('$set.$.individual', self.eval_pop[ind_index])
traj.f_add_result('$set.$.fitness', fitness)
weighted_fitness_list.append(np.dot(fitness, self.optimizee_fitness_weights))
traj.v_idx = -1 # set trajectory back to default
weighted_fitness_list = np.array(weighted_fitness_list).ravel()
# NOTE: It is necessary to clear the finesses_results to clear the data in the reference, and del
#^ is used to make sure it's not used in the rest of this function
fitnesses_results.clear()
del fitnesses_results
# Last fitness is for the previous `current_individual_arr`
weighted_fitness_list = weighted_fitness_list[:-1]
current_individual_fitness = weighted_fitness_list[-1]
# Performs descending arg-sort of weighted fitness
fitness_sorting_indices = list(reversed(np.argsort(weighted_fitness_list)))
# Sorting the data according to fitness
sorted_population = self.eval_pop_arr[fitness_sorting_indices]
sorted_fitness = np.asarray(weighted_fitness_list)[fitness_sorting_indices]
sorted_perturbations = self.current_perturbations[fitness_sorting_indices]
self.best_individual = list_to_dict(sorted_population[0], self.optimizee_individual_dict_spec)
self.best_fitness_in_run = sorted_fitness[0]
logger.info("-- End of generation %d --", self.g)
logger.info(" Evaluated %d individuals", len(weighted_fitness_list) + 1)
logger.info(' Best Fitness: %.4f', self.best_fitness_in_run)
logger.info(' Average Fitness: %.4f', np.mean(sorted_fitness))
#**************************************************************************************************************
# Storing Generation Parameters / Results in the trajectory
#**************************************************************************************************************
# These entries correspond to the generation that has been simulated prior to this post-processing run
# Documentation of algorithm parameters for the current generation
#
# generation - The index of the evaluated generation
# best_fitness_in_run - The highest fitness among the individuals in the
# evaluated generation
# pop_size - Population size
generation_result_dict = {
'generation': self.g,
'best_fitness_in_run': self.best_fitness_in_run,
'current_individual_fitness': current_individual_fitness,
'average_fitness_in_run': np.mean(sorted_fitness),
'pop_size': self.pop_size
}
generation_name = 'generation_{}'.format(self.g)
traj.results.generation_params.f_add_result_group(generation_name)
traj.results.generation_params.f_add_result(
generation_name + '.algorithm_params',
generation_result_dict,
comment="These are the parameters that correspond to the algorithm. "
"Look at the source code for `EvolutionStrategiesOptimizer::post_process()` "
"for comments documenting these parameters"
)
if fitness_shaping_enabled:
sorted_utilities = []
n_individuals = len(sorted_fitness)
for i in range(n_individuals):
u = max(0., np.log((n_individuals / 2) + 1) - np.log(i + 1))
sorted_utilities.append(u)
sorted_utilities = np.array(sorted_utilities)
sorted_utilities /= np.sum(sorted_utilities)
sorted_utilities -= (1. / n_individuals)
# assert np.sum(sorted_utilities) == 0., "Sum of utilities is not 0, but %.4f" % np.sum(sorted_utilities)
fitnesses_to_fit = sorted_utilities
else:
fitnesses_to_fit = sorted_fitness
assert len(fitnesses_to_fit) == len(sorted_perturbations)
self.current_individual_arr += learning_rate \
* np.sum([f * e for f, e in zip(fitnesses_to_fit, sorted_perturbations)], axis=0) \
/ (len(fitnesses_to_fit) * noise_std ** 2)
#**************************************************************************************************************
# Create the next generation by sampling the inferred distribution
#**************************************************************************************************************
# Note that this is only done in case the evaluated run is not the last run
self.eval_pop.clear()
# check if to stop
if self.g < n_iteration - 1 and self.best_fitness_in_run < stop_criterion:
self.current_perturbations = self._get_perturbations(traj)
current_eval_pop_arr = (self.current_individual_arr + self.current_perturbations).tolist()
self.eval_pop = [list_to_dict(ind, self.optimizee_individual_dict_spec) for ind in current_eval_pop_arr]
self.eval_pop.append(list_to_dict(self.current_individual_arr, self.optimizee_individual_dict_spec))
# Bounding function has to be applied AFTER the individual has been converted to a dict
if self.optimizee_bounding_func is not None:
self.eval_pop = [self.optimizee_bounding_func(ind) for ind in self.eval_pop]
self.eval_pop_arr = np.array([dict_to_list(ind) for ind in self.eval_pop])
self.g += 1 # Update generation counter
self._expand_trajectory(traj)
[docs] def end(self, traj):
"""
See :meth:`~l2l.optimizers.optimizer.Optimizer.end`
"""
best_last_indiv_dict = self.best_individual
traj.f_add_result('final_individual', best_last_indiv_dict)
traj.f_add_result('final_fitness', self.best_fitness_in_run)
traj.f_add_result('n_iteration', self.g + 1)
# ------------ Finished all runs and print result --------------- #
logger.info("-- End of (successful) ES optimization --")