Source code for ezclimate.optimization

from __future__ import division, print_function
import numpy as np
import multiprocessing
from tools import _pickle_method, _unpickle_method
try:
    import copy_reg
except:
    import copyreg as copy_reg
import types

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

[docs]class GeneticAlgorithm(object): """Optimization algorithm for the EZ-Climate model. Parameters ---------- pop_amount : int number of individuals in the population num_feature : int number of elements in each individual, i.e. number of nodes in tree-model num_generations : int number of generations of the populations to be evaluated bound : float upper bound of mitigation in each node cx_prob : float probability of mating mut_prob : float probability of mutation. utility : `Utility` object object of utility class fixed_values : ndarray, optional nodes to keep fixed fixed_indicies : ndarray, optional indicies of nodes to keep fixed print_progress : bool, optional if the progress of the evolution should be printed Attributes ---------- pop_amount : int number of individuals in the population num_feature : int number of elements in each individual, i.e. number of nodes in tree-model num_generations : int number of generations of the populations to be evaluated bound : float upper bound of mitigation in each node cx_prob : float probability of mating mut_prob : float probability of mutation. u : `Utility` object object of utility class fixed_values : ndarray, optional nodes to keep fixed fixed_indicies : ndarray, optional indicies of nodes to keep fixed print_progress : bool, optional if the progress of the evolution should be printed """ def __init__(self, pop_amount, num_generations, cx_prob, mut_prob, bound, num_feature, utility, fixed_values=None, fixed_indicies=None, print_progress=False): self.num_feature = num_feature self.pop_amount = pop_amount self.num_gen = num_generations self.cx_prob = cx_prob self.mut_prob = mut_prob self.u = utility self.bound = bound self.fixed_values = fixed_values self.fixed_indicies = fixed_indicies self.print_progress = print_progress def _generate_population(self, size): """Return 1D-array of random values in the given bound as the initial population.""" pop = np.random.random([size, self.num_feature])*self.bound if self.fixed_values is not None: for ind in pop: ind[self.fixed_indicies] = self.fixed_values return pop def _evaluate(self, indvidual): """Returns the utility of given individual.""" return self.u.utility(indvidual) def _select(self, pop, rate): """Returns a 1D-array of selected individuals. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') rate : float the probability of an individual being selected Returns ------- ndarray selected individuals """ index = np.random.choice(self.pop_amount, int(rate*self.pop_amount), replace=False) return pop[index,:] def _random_index(self, individuals, size): """Generate a random index of individuals of size 'size'. Parameters ---------- individuals : ndarray or list 2D-array of individuals size : int number of indices to generate Returns ------- ndarray 1D-array of indices """ inds_size = len(individuals) return np.random.choice(inds_size, size) def _selection_tournament(self, pop, k, tournsize, fitness): """Select `k` individuals from the input `individuals` using `k` tournaments of `tournsize` individuals. Parameters ---------- individuals : ndarray or list 2D-array of individuals to select from k : int number of individuals to select tournsize : int number of individuals participating in each tournament Returns ------- ndarray s selected individuals """ chosen = [] for i in xrange(k): index = self._random_index(pop, tournsize) aspirants = pop[index] aspirants_fitness = fitness[index] chosen_index = np.where(aspirants_fitness == np.max(aspirants_fitness))[0] if len(chosen_index) != 0: chosen_index = chosen_index[0] chosen.append(aspirants[chosen_index]) return np.array(chosen) def _two_point_cross_over(self, pop): """Performs a two-point cross-over of the population. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') """ child_group1 = pop[::2] child_group2 = pop[1::2] for child1, child2 in zip(child_group1, child_group2): if np.random.random() <= self.cx_prob: cxpoint1 = np.random.randint(1, self.num_feature) cxpoint2 = np.random.randint(1, self.num_feature - 1) if cxpoint2 >= cxpoint1: cxpoint2 += 1 else: # Swap the two cx points cxpoint1, cxpoint2 = cxpoint2, cxpoint1 child1[cxpoint1:cxpoint2], child2[cxpoint1:cxpoint2] \ = child2[cxpoint1:cxpoint2].copy(), child1[cxpoint1:cxpoint2].copy() if self.fixed_values is not None: child1[self.fixed_indicies] = self.fixed_values child2[self.fixed_indicies] = self.fixed_values def _uniform_cross_over(self, pop, ind_prob): """Performs a uniform cross-over of the population. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') ind_prob : float probability of feature cross-over """ child_group1 = pop[::2] child_group2 = pop[1::2] for child1, child2 in zip(child_group1, child_group2): size = min(len(child1), len(child2)) for i in range(size): if np.random.random() < ind_prob: child1[i], child2[i] = child2[i], child1[i] def _mutate(self, pop, ind_prob, scale=2.0): """Mutates individual's elements. The individual has a probability of `mut_prob` of beeing selected and every element in this individual has a probability `ind_prob` of beeing mutated. The mutated value is a random number. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') ind_prob : float probability of feature mutation scale : float scaling constant of the random generated number for mutation """ pop_tmp = np.copy(pop) mutate_index = np.random.choice(self.pop_amount, int(self.mut_prob * self.pop_amount), replace=False) for i in mutate_index: feature_index = np.random.choice(self.num_feature, int(ind_prob * self.num_feature), replace=False) for j in feature_index: if self.fixed_indicies is not None and j in self.fixed_indicies: continue else: pop[i][j] = max(0.0, pop[i][j]+(np.random.random()-0.5)*scale) def _uniform_mutation(self, pop, ind_prob, scale=2.0): """Mutates individual's elements. The individual has a probability of `mut_prob` of beeing selected and every element in this individual has a probability `ind_prob` of beeing mutated. The mutated value is the current value plus a scaled uniform [-0.5,0.5] random value. Parameters ---------- pop : ndarray population given by 2D-array with shape ('pop_amount', 'num_feature') ind_prob : float probability of feature mutation scale : float scaling constant of the random generated number for mutation """ pop_len = len(pop) mutate_index = np.random.choice(pop_len, int(self.mut_prob * pop_len), replace=False) for i in mutate_index: prob = np.random.random(self.num_feature) inc = (np.random.random(self.num_feature) - 0.5)*scale pop[i] += (prob > (1.0-ind_prob)).astype(int)*inc pop[i] = np.maximum(1e-5, pop[i]) if self.fixed_values is not None: pop[i][self.fixed_indicies] = self.fixed_values def _show_evolution(self, fits, pop): """Print statistics of the evolution of the population.""" length = len(pop) mean = fits.mean() std = fits.std() min_val = fits.min() max_val = fits.max() print (" Min {} \n Max {} \n Avg {}".format(min_val, max_val, mean)) print (" Std {} \n Population Size {}".format(std, length)) print (" Best Individual: ", pop[np.argmax(fits)]) def _survive(self, pop_tmp, fitness_tmp): """The 80 percent of the individuals with best fitness survives to the next generation. Parameters ---------- pop_tmp : ndarray population fitness_tmp : ndarray fitness values of `pop_temp` Returns ------- ndarray individuals that survived """ index_fits = np.argsort(fitness_tmp)[::-1] fitness = fitness_tmp[index_fits] pop = pop_tmp[index_fits] num_survive = int(0.8*self.pop_amount) survive_pop = np.copy(pop[:num_survive]) survive_fitness = np.copy(fitness[:num_survive]) return np.copy(survive_pop), np.copy(survive_fitness)
[docs] def run(self): """Start the evolution process. The evolution steps are: 1. Select the individuals to perform cross-over and mutation. 2. Cross over among the selected candidate. 3. Mutate result as offspring. 4. Combine the result of offspring and parent together. And selected the top 80 percent of original population amount. 5. Random Generate 20 percent of original population amount new individuals and combine the above new population. Returns ------- tuple final population and the fitness for the final population Note ---- Uses the :mod:`~multiprocessing` package. """ print("----------------Genetic Evolution Starting----------------") pop = self._generate_population(self.pop_amount) pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) fitness = pool.map(self._evaluate, pop) # how do we know pop[i] belongs to fitness[i]? fitness = np.array([val[0] for val in fitness]) u_hist = np.zeros(self.num_gen) for g in range(0, self.num_gen): print ("-- Generation {} --".format(g+1)) pop_select = self._select(np.copy(pop), rate=1) self._uniform_cross_over(pop_select, 0.50) self._uniform_mutation(pop_select, 0.25, np.exp(-float(g)/self.num_gen)**2) #self._mutate(pop_select, 0.05) fitness_select = pool.map(self._evaluate, pop_select) fitness_select = np.array([val[0] for val in fitness_select]) pop_tmp = np.append(pop, pop_select, axis=0) fitness_tmp = np.append(fitness, fitness_select, axis=0) pop_survive, fitness_survive = self._survive(pop_tmp, fitness_tmp) pop_new = self._generate_population(self.pop_amount - len(pop_survive)) fitness_new = pool.map(self._evaluate, pop_new) fitness_new = np.array([val[0] for val in fitness_new]) pop = np.append(pop_survive, pop_new, axis=0) fitness = np.append(fitness_survive, fitness_new, axis=0) if self.print_progress: self._show_evolution(fitness, pop) u_hist[g] = fitness[0] fitness = pool.map(self._evaluate, pop) fitness = np.array([val[0] for val in fitness]) return pop, fitness
[docs]class GradientSearch(object) : """Gradient search optimization algorithm for the EZ-Climate model. Parameters ---------- utility : `Utility` object object of utility class learning_rate : float starting learning rate of gradient descent var_nums : int number of elements in array to optimize accuracy : float stop value for the gradient descent fixed_values : ndarray, optional nodes to keep fixed fixed_indicies : ndarray, optional indicies of nodes to keep fixed print_progress : bool, optional if the progress of the evolution should be printed scale_alpha : ndarray, optional array to scale the learning rate Attributes ---------- utility : `Utility` object object of utility class learning_rate : float starting learning rate of gradient descent var_nums : int number of elements in array to optimize accuracy : float stop value for the gradient descent fixed_values : ndarray, optional nodes to keep fixed fixed_indicies : ndarray, optional indicies of nodes to keep fixed print_progress : bool, optional if the progress of the evolution should be printed scale_alpha : ndarray, optional array to scale the learning rate """ def __init__(self, utility, var_nums, accuracy=1e-06, iterations=100, fixed_values=None, fixed_indicies=None, print_progress=False, scale_alpha=None): self.u = utility self.var_nums = var_nums self.accuracy = accuracy self.iterations = iterations self.fixed_values = fixed_values self.fixed_indicies = fixed_indicies self.print_progress = print_progress self.scale_alpha = scale_alpha if scale_alpha is None: self.scale_alpha = np.exp(np.linspace(0.0, 3.0, var_nums)) def _partial_grad(self, i): """Calculate the ith element of the gradient vector.""" m_copy = self.m.copy() m_copy[i] = m_copy[i] - self.delta if (m_copy[i] - self.delta)>=0 else 0.0 minus_utility = self.u.utility(m_copy) m_copy[i] += 2*self.delta plus_utility = self.u.utility(m_copy) grad = (plus_utility-minus_utility) / (2*self.delta) return grad, i
[docs] def numerical_gradient(self, m, delta=1e-08, fixed_indicies=None): """Calculate utility gradient numerically. Parameters ---------- m : ndarray or list array of mitigation delta : float, optional change in mitigation fixed_indicies : ndarray or list, optional indicies of gradient that should not be calculated Returns ------- ndarray gradient """ self.delta = delta self.m = m if fixed_indicies is None: fixed_indicies = [] grad = np.zeros(len(m)) if not isinstance(m, np.ndarray): self.m = np.array(m) pool = multiprocessing.Pool() indicies = np.delete(range(len(m)), fixed_indicies) res = pool.map(self._partial_grad, indicies) for g, i in res: grad[i] = g pool.close() pool.join() del self.m del self.delta return grad
def _accelerate_scale(self, accelerator, prev_grad, grad): sign_vector = np.sign(prev_grad * grad) scale_vector = np.ones(self.var_nums) * ( 1 + 0.10) accelerator[sign_vector <= 0] = 1 accelerator *= scale_vector return accelerator
[docs] def gradient_descent(self, initial_point, return_last=False): """Gradient descent algorithm. The `initial_point` is updated using the Adam algorithm. Adam uses the history of the gradient to compute individual step sizes for each element in the mitigation vector. The vector of step sizes are calculated using estimates of the first and second moments of the gradient. Parameters ---------- initial_point : ndarray initial guess of the mitigation return_last : bool, optional if True the function returns the last point, else the point with highest utility Returns ------- tuple (best point, best utility) """ num_decision_nodes = initial_point.shape[0] x_hist = np.zeros((self.iterations+1, num_decision_nodes)) u_hist = np.zeros(self.iterations+1) u_hist[0] = self.u.utility(initial_point) x_hist[0] = initial_point beta1, beta2 = 0.90, 0.90 eta = 0.0015 eps = 1e-3 m_t, v_t = 0, 0 prev_grad = 0.0 accelerator = np.ones(self.var_nums) for i in range(self.iterations): grad = self.numerical_gradient(x_hist[i], fixed_indicies=self.fixed_indicies) m_t = beta1*m_t + (1-beta1)*grad v_t = beta2*v_t + (1-beta2)*np.power(grad, 2) m_hat = m_t / (1-beta1**(i+1)) v_hat = v_t / (1-beta2**(i+1)) if i != 0: accelerator = self._accelerate_scale(accelerator, prev_grad, grad) new_x = x_hist[i] + ((eta*m_hat)/(np.square(v_hat)+eps)) * accelerator new_x[new_x < 0] = 0.0 if self.fixed_values is not None: new_x[self.fixed_indicies] = self.fixed_values x_hist[i+1] = new_x u_hist[i+1] = self.u.utility(new_x)[0] prev_grad = grad.copy() if self.print_progress: print("-- Iteration {} -- \n Current Utility: {}".format(i+1, u_hist[i+1])) print(new_x) if return_last: return x_hist[i+1], u_hist[i+1] best_index = np.argmax(u_hist) return x_hist[best_index], u_hist[best_index]
[docs] def run(self, initial_point_list, topk=4): """Initiate the gradient search algorithm. Parameters ---------- initial_point_list : list list of initial points to select from topk : int, optional select and run gradient descent on the `topk` first points of `initial_point_list` Returns ------- tuple best mitigation point and the utility of the best mitigation point Raises ------ ValueError If `topk` is larger than the length of `initial_point_list`. Note ---- Uses the :mod:`~multiprocessing` package. """ print("----------------Gradient Search Starting----------------") if topk > len(initial_point_list): raise ValueError("topk {} > number of initial points {}".format(topk, len(initial_point_list))) candidate_points = initial_point_list[:topk] mitigations = [] utilities = np.zeros(topk) for cp, count in zip(candidate_points, range(topk)): if not isinstance(cp, np.ndarray): cp = np.array(cp) print("Starting process {} of Gradient Descent".format(count+1)) m, u = self.gradient_descent(cp) mitigations.append(m) utilities[count] = u best_index = np.argmax(utilities) return mitigations[best_index], utilities[best_index]
[docs]class CoordinateDescent(object): """Coordinate descent optimization algorithm for the EZ-Climate model. Parameters ---------- utility : `Utility` object object of utility class var_nums : int number of elements in array to optimize accuracy : float stop value for the utility increase iterations : int maximum number of iterations Attributes ---------- utility : `Utility` object object of utility class var_nums : int number of elements in array to optimize accuracy : float stop value for the utility increase iterations : int maximum number of iterations """ def __init__(self, utility, var_nums, accuracy=1e-4, iterations=100): self.u = utility self.var_nums = var_nums self.accuracy = accuracy self.iterations = iterations def _min_func(self, x, m, i): m_copy = m.copy() m_copy[i] = x return -self.u.utility(m_copy)[0] def _minimize_node(self, node, m): from scipy.optimize import fmin return fmin(self._min_func, x0=m[node], args=(m, node), disp=False)
[docs] def run(self, m): """Run the coordinate descent iterations. Parameters ---------- m : initial point Returns ------- tuple best mitigation point and the utility of the best mitigation point Note ---- Uses the :mod:`~scipy` package. """ num_decision_nodes = m.shape[0] x_hist = [] u_hist = [] nodes = range(self.var_nums) x_hist.append(m.copy()) u_hist.append(self.u.utility(m)[0]) print("----------------Coordinate Descent Starting----------------") print("Starting Utility: {}".format(u_hist[0])) for i in range(self.iterations): print("-- Iteration {} --".format(i+1)) node_iteration = np.random.choice(nodes, replace=False, size=len(nodes)) for node in node_iteration: m[node] = max(0.0, self._minimize_node(node, m)) x_hist.append(m.copy()) u_hist.append(self.u.utility(m)[0]) print("Current Utility: {}".format(u_hist[i+1])) if np.abs(u_hist[i+1] - u_hist[i]) < self.accuracy: break return x_hist[-1], u_hist[-1]