Source code for ezclimate.optimization.genetic_algorithm

from __future__ import division, print_function
import numpy as np
import multiprocessing
from ezclimate.tools import _pickle_method, _unpickle_method
try:
    import copy_reg
except:
    import copyreg as copy_reg
import types

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

[docs]class GeneticAlgorithm(object): """Optimization algorithm for the EZ-Climate model. Parameters ---------- pop_amount : int number of individuals in the population num_feature : int number of elements in each individual, i.e. number of nodes in tree-model num_generations : int number of generations of the populations to be evaluated bound : float upper bound of mitigation in each node cx_prob : float probability of mating mut_prob : float probability of mutation. utility : `Utility` object object of utility class fixed_values : ndarray, optional nodes to keep fixed fixed_indicies : ndarray, optional indicies of nodes to keep fixed print_progress : bool, optional if the progress of the evolution should be printed Attributes ---------- pop_amount : int number of individuals in the population num_feature : int number of elements in each individual, i.e. number of nodes in tree-model num_generations : int number of generations of the populations to be evaluated bound : float upper bound of mitigation in each node cx_prob : float probability of mating mut_prob : float probability of mutation. u : `Utility` object object of utility class fixed_values : ndarray, optional nodes to keep fixed fixed_indicies : ndarray, optional indicies of nodes to keep fixed print_progress : bool, optional if the progress of the evolution should be printed """ def __init__(self, pop_amount, num_generations, cx_prob, mut_prob, bound, num_feature, utility, fixed_values=None, fixed_indicies=None, print_progress=False): self.num_feature = num_feature self.pop_amount = pop_amount self.num_gen = num_generations self.cx_prob = cx_prob self.mut_prob = mut_prob self.u = utility self.bound = bound self.fixed_values = fixed_values self.fixed_indicies = fixed_indicies self.print_progress = print_progress def _generate_population(self, size): """Return 1D-array of random values in the given bound as the initial population.""" pop = np.random.random([size, self.num_feature])*self.bound if self.fixed_values is not None: for ind in pop: ind[self.fixed_indicies] = self.fixed_values return pop def _evaluate(self, indvidual): """Returns the utility of given individual.""" return self.u.utility(indvidual) def _select(self, pop, rate): """Returns a 1D-array of selected individuals. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') rate : float the probability of an individual being selected Returns ------- ndarray selected individuals """ index = np.random.choice(self.pop_amount, int(rate*self.pop_amount), replace=False) return pop[index,:] def _random_index(self, individuals, size): """Generate a random index of individuals of size 'size'. Parameters ---------- individuals : ndarray or list 2D-array of individuals size : int number of indices to generate Returns ------- ndarray 1D-array of indices """ inds_size = len(individuals) return np.random.choice(inds_size, size) def _selection_tournament(self, pop, k, tournsize, fitness): """Select `k` individuals from the input `individuals` using `k` tournaments of `tournsize` individuals. Parameters ---------- individuals : ndarray or list 2D-array of individuals to select from k : int number of individuals to select tournsize : int number of individuals participating in each tournament Returns ------- ndarray s selected individuals """ chosen = [] for i in xrange(k): index = self._random_index(pop, tournsize) aspirants = pop[index] aspirants_fitness = fitness[index] chosen_index = np.where(aspirants_fitness == np.max(aspirants_fitness))[0] if len(chosen_index) != 0: chosen_index = chosen_index[0] chosen.append(aspirants[chosen_index]) return np.array(chosen) def _two_point_cross_over(self, pop): """Performs a two-point cross-over of the population. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') """ child_group1 = pop[::2] child_group2 = pop[1::2] for child1, child2 in zip(child_group1, child_group2): if np.random.random() <= self.cx_prob: cxpoint1 = np.random.randint(1, self.num_feature) cxpoint2 = np.random.randint(1, self.num_feature - 1) if cxpoint2 >= cxpoint1: cxpoint2 += 1 else: # Swap the two cx points cxpoint1, cxpoint2 = cxpoint2, cxpoint1 child1[cxpoint1:cxpoint2], child2[cxpoint1:cxpoint2] \ = child2[cxpoint1:cxpoint2].copy(), child1[cxpoint1:cxpoint2].copy() if self.fixed_values is not None: child1[self.fixed_indicies] = self.fixed_values child2[self.fixed_indicies] = self.fixed_values def _uniform_cross_over(self, pop, ind_prob): """Performs a uniform cross-over of the population. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') ind_prob : float probability of feature cross-over """ child_group1 = pop[::2] child_group2 = pop[1::2] for child1, child2 in zip(child_group1, child_group2): size = min(len(child1), len(child2)) for i in range(size): if np.random.random() < ind_prob: child1[i], child2[i] = child2[i], child1[i] def _mutate(self, pop, ind_prob, scale=2.0): """Mutates individual's elements. The individual has a probability of `mut_prob` of beeing selected and every element in this individual has a probability `ind_prob` of beeing mutated. The mutated value is a random number. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') ind_prob : float probability of feature mutation scale : float scaling constant of the random generated number for mutation """ pop_tmp = np.copy(pop) mutate_index = np.random.choice(self.pop_amount, int(self.mut_prob * self.pop_amount), replace=False) for i in mutate_index: feature_index = np.random.choice(self.num_feature, int(ind_prob * self.num_feature), replace=False) for j in feature_index: if self.fixed_indicies is not None and j in self.fixed_indicies: continue else: pop[i][j] = max(0.0, pop[i][j]+(np.random.random()-0.5)*scale) def _uniform_mutation(self, pop, ind_prob, scale=2.0): """Mutates individual's elements. The individual has a probability of `mut_prob` of beeing selected and every element in this individual has a probability `ind_prob` of beeing mutated. The mutated value is the current value plus a scaled uniform [-0.5,0.5] random value. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') ind_prob : float probability of feature mutation scale : float scaling constant of the random generated number for mutation """ pop_len = len(pop) mutate_index = np.random.choice(pop_len, int(self.mut_prob * pop_len), replace=False) for i in mutate_index: prob = np.random.random(self.num_feature) inc = (np.random.random(self.num_feature) - 0.5)*scale pop[i] += (prob > (1.0-ind_prob)).astype(int)*inc pop[i] = np.maximum(1e-5, pop[i]) if self.fixed_values is not None: pop[i][self.fixed_indicies] = self.fixed_values def _show_evolution(self, fits, pop): """Print statistics of the evolution of the population.""" length = len(pop) mean = fits.mean() std = fits.std() min_val = fits.min() max_val = fits.max() print (" Min {} \n Max {} \n Avg {}".format(min_val, max_val, mean)) print (" Std {} \n Population Size {}".format(std, length)) print (" Best Individual: ", pop[np.argmax(fits)]) def _survive(self, pop_tmp, fitness_tmp): """The 80 percent of the individuals with best fitness survives to the next generation. Parameters ---------- pop_tmp : ndarray population fitness_tmp : ndarray fitness values of `pop_temp` Returns ------- ndarray individuals that survived """ index_fits = np.argsort(fitness_tmp)[::-1] fitness = fitness_tmp[index_fits] pop = pop_tmp[index_fits] num_survive = int(0.8*self.pop_amount) survive_pop = np.copy(pop[:num_survive]) survive_fitness = np.copy(fitness[:num_survive]) return np.copy(survive_pop), np.copy(survive_fitness)
[docs] def run(self): """Start the evolution process. The evolution steps are: 1. Select the individuals to perform cross-over and mutation. 2. Cross over among the selected candidate. 3. Mutate result as offspring. 4. Combine the result of offspring and parent together. And selected the top 80 percent of original population amount. 5. Random Generate 20 percent of original population amount new individuals and combine the above new population. Returns ------- tuple final population and the fitness for the final population Note ---- Uses the :mod:`~multiprocessing` package. """ print("----------------Genetic Evolution Starting----------------") pop = self._generate_population(self.pop_amount) pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) fitness = pool.map(self._evaluate, pop) # how do we know pop[i] belongs to fitness[i]? fitness = np.array([val[0] for val in fitness]) u_hist = np.zeros(self.num_gen) for g in range(0, self.num_gen): print ("-- Generation {} --".format(g+1)) pop_select = self._select(np.copy(pop), rate=1) self._uniform_cross_over(pop_select, 0.50) self._uniform_mutation(pop_select, 0.25, np.exp(-float(g)/self.num_gen)**2) #self._mutate(pop_select, 0.05) fitness_select = pool.map(self._evaluate, pop_select) fitness_select = np.array([val[0] for val in fitness_select]) pop_tmp = np.append(pop, pop_select, axis=0) fitness_tmp = np.append(fitness, fitness_select, axis=0) pop_survive, fitness_survive = self._survive(pop_tmp, fitness_tmp) pop_new = self._generate_population(self.pop_amount - len(pop_survive)) fitness_new = pool.map(self._evaluate, pop_new) fitness_new = np.array([val[0] for val in fitness_new]) pop = np.append(pop_survive, pop_new, axis=0) fitness = np.append(fitness_survive, fitness_new, axis=0) if self.print_progress: self._show_evolution(fitness, pop) u_hist[g] = fitness[0] fitness = pool.map(self._evaluate, pop) fitness = np.array([val[0] for val in fitness]) return pop, fitness