from __future__ import division, print_function
from abc import ABCMeta, abstractmethod
import numpy as np
from storage_tree import BigStorageTree, SmallStorageTree
np.seterr(all='ignore')
[docs]class EZUtility(object):
"""Calculation of Epstein-Zin utility for the EZ-Climate model.
The Epstein-Zin utility allows for different rates of substitution across time and
states. For specification see DLW-paper.
Parameters
----------
tree : `TreeModel` object
tree structure used
damage : `Damage` object
class that provides damage methods
cost : `Cost` object
class that procides cost methods
period_len : float
subinterval length
eis : float, optional
elasticity of intertemporal substitution
ra : float, optional
risk-aversion
time_pref : float, optional
pure rate of time preference
Attributes
----------
tree : `TreeModel` object
tree structure used
damage : `Damage` object
class that provides damage methods
cost : `Cost` object
class that procides cost methods
period_len : float
subinterval length
decision_times : ndarray
years in the future where decisions will be made
cons_growth : float
consumption growth
growth_term : float
1 + cons_growth
r : float
the parameter rho from the DLW-paper
a : float
the parameter alpha from the DLW-paper
b : float
the parameter beta from the DLW-paper
"""
def __init__(self, tree, damage, cost, period_len, eis=0.9, ra=7.0, time_pref=0.005,
add_penalty_cost=False, max_penalty=0.0, penalty_scale=1.0):
self.tree = tree
self.damage = damage
self.cost = cost
self.period_len = period_len
self.decision_times = tree.decision_times
self.cons_growth = damage.cons_growth
self.growth_term = 1.0 + self.cons_growth
self.r = 1.0 - 1.0/eis
self.a = 1.0 - ra
self.b = (1.0-time_pref)**period_len
self.potential_cons = np.ones(self.decision_times.shape) + self.cons_growth
self.potential_cons = self.potential_cons ** self.decision_times
self.add_penalty_cost = add_penalty_cost
self.max_penalty = max_penalty
self.penalty_scale = penalty_scale
def _end_period_utility(self, m, utility_tree, cons_tree, cost_tree):
"""Calculate the terminal utility."""
period_ave_mitigation = self.damage.average_mitigation(m, self.tree.num_periods)
period_damage = self.damage.damage_function(m, self.tree.num_periods)
damage_nodes = self.tree.get_nodes_in_period(self.tree.num_periods)
period_mitigation = m[damage_nodes[0]:damage_nodes[1]+1]
period_cost = self.cost.cost(self.tree.num_periods, period_mitigation, period_ave_mitigation)
continuation = (1.0 / (1.0 - self.b*(self.growth_term**self.r)))**(1.0/self.r)
cost_tree.set_value(cost_tree.last_period, period_cost)
period_consumption = self.potential_cons[-1] * (1.0 - period_damage)
period_consumption[period_consumption<=0.0] = 1e-18
cons_tree.set_value(cons_tree.last_period, period_consumption)
utility_tree.set_value(utility_tree.last_period, (1.0 - self.b)**(1.0/self.r) * cons_tree.last * continuation)
def _end_period_marginal_utility(self, mu_tree_0, mu_tree_1, ce_tree, utility_tree, cons_tree):
"""Calculate the terminal marginal utility."""
ce_term = utility_tree.last**self.r - (1.0 - self.b)*cons_tree.last**self.r
ce_tree.set_value(ce_tree.last_period, ce_term)
mu_0_last = (1.0 - self.b)*(utility_tree[utility_tree.last_period-self.period_len] / cons_tree.last)**(1.0-self.r)
mu_tree_0.set_value(mu_tree_0.last_period, mu_0_last)
mu_0 = self._mu_0(cons_tree[cons_tree.last_period-self.period_len], ce_tree[ce_tree.last_period-self.period_len])
mu_tree_0.set_value(mu_tree_0.last_period-self.period_len, mu_0)
next_term = self.b * (1.0 - self.b) / (1.0 - self.b * self.growth_term**self.r)
mu_1 = utility_tree[utility_tree.last_period-self.period_len]**(1-self.r) * next_term * cons_tree.last**(self.r-1.0)
mu_tree_1.set_value(mu_tree_1.last_period-self.period_len, mu_1)
def _certain_equivalence(self, period, damage_period, utility_tree):
"""Caclulate certainty equivalence utility. If we are between decision nodes, i.e. no branching,
then certainty equivalent utility at time period depends only on the utility next period
given information known today. Otherwise the certainty equivalent utility is the ability
weighted sum of next period utility over the partition reachable from the state.
"""
if utility_tree.is_information_period(period):
damage_nodes = self.tree.get_nodes_in_period(damage_period+1)
probs = self.tree.node_prob[damage_nodes[0]:damage_nodes[1]+1]
even_probs = probs[::2]
odd_probs = probs[1::2]
even_util = ((utility_tree.get_next_period_array(period)[::2])**self.a) * even_probs
odd_util = ((utility_tree.get_next_period_array(period)[1::2])**self.a) * odd_probs
ave_util = (even_util + odd_util) / (even_probs + odd_probs)
cert_equiv = ave_util**(1.0/self.a)
else:
# no branching implies certainty equivalent utility at time period depends only on
# the utility next period given information known today
cert_equiv = utility_tree.get_next_period_array(period)
return cert_equiv
def _utility_generator(self, m, utility_tree, cons_tree, cost_tree, ce_tree, cons_adj=0.0):
"""Generator for calculating utility for each utility period besides the terminal utility."""
periods = utility_tree.periods[::-1]
for period in periods[1:]:
damage_period = utility_tree.between_decision_times(period)
cert_equiv = self._certain_equivalence(period, damage_period, utility_tree)
if utility_tree.is_decision_period(period+self.period_len):
damage_nodes = self.tree.get_nodes_in_period(damage_period)
period_mitigation = m[damage_nodes[0]:damage_nodes[1]+1]
period_ave_mitigation = self.damage.average_mitigation(m, damage_period)
period_cost = self.cost.cost(damage_period, period_mitigation, period_ave_mitigation)
period_damage = self.damage.damage_function(m, damage_period)
cost_tree.set_value(cost_tree.index_below(period+self.period_len), period_cost)
period_consumption = self.potential_cons[damage_period] * (1.0 - period_damage) * (1.0 - period_cost)
period_consumption[period_consumption <= 0.0] = 1e-18
if not utility_tree.is_decision_period(period):
next_consumption = cons_tree.get_next_period_array(period)
segment = period - utility_tree.decision_times[damage_period]
interval = segment + utility_tree.subinterval_len
if utility_tree.is_decision_period(period+self.period_len):
if period < utility_tree.decision_times[-2]:
next_cost = cost_tree[period+self.period_len]
next_consumption *= (1.0 - np.repeat(period_cost,2)) / (1.0 - next_cost)
next_consumption[next_consumption<=0.0] = 1e-18
if period < utility_tree.decision_times[-2]:
temp_consumption = next_consumption/np.repeat(period_consumption,2)
period_consumption = np.sign(temp_consumption)*(np.abs(temp_consumption)**(segment/float(interval))) \
* np.repeat(period_consumption,2)
else:
temp_consumption = next_consumption/period_consumption
period_consumption = np.sign(temp_consumption)*(np.abs(temp_consumption)**(segment/float(interval))) \
* period_consumption
if period == 0:
period_consumption += cons_adj
ce_term = self.b * cert_equiv**self.r
ce_tree.set_value(period, ce_term)
cons_tree.set_value(period, period_consumption)
u = ((1.0-self.b)*period_consumption**self.r + ce_term)**(1.0/self.r)
yield u, period
[docs] def utility(self, m, return_trees=False):
"""Calculating utility for the specific mitigation decisions `m`.
Parameters
----------
m : ndarray or list
array of mitigations
return_trees : bool
True if methid should return trees calculated in producing the utility
Returns
-------
ndarray or tuple
tuple of `BaseStorageTree` if return_trees else ndarray with utility at period 0
Examples:
---------
Assuming we have declared a EZUtility object as 'ezu' and have a mitigation array 'm'
>>> ezu.utility(m)
array([ 9.83391921])
>>> utility_tree, cons_tree, cost_tree, ce_tree = ezu.utility(m, return_trees=True)
"""
utility_tree = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
cons_tree = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
ce_tree = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
cost_tree = SmallStorageTree(decision_times=self.decision_times)
self._end_period_utility(m, utility_tree, cons_tree, cost_tree)
it = self._utility_generator(m, utility_tree, cons_tree, cost_tree, ce_tree)
for u, period in it:
utility_tree.set_value(period, u)
if return_trees:
return utility_tree, cons_tree, cost_tree, ce_tree
return utility_tree[0]
[docs] def adjusted_utility(self, m, period_cons_eps=None, node_cons_eps=None, final_cons_eps=0.0,
first_period_consadj=0.0, return_trees=False):
"""Calculating adjusted utility for sensitivity analysis. Used e.g. to find zero-coupon bond price.
Values in parameters are used to adjusted the utility in different ways.
Parameters
----------
m : ndarray
array of mitigations
period_cons_eps : ndarray, optional
array of increases in consumption per period
node_cons_eps : `SmallStorageTree`, optional
increases in consumption per node
final_cons_eps : float, optional
value to increase the final utilities by
first_period_consadj : float, optional
value to increase consumption at period 0 by
return_trees : bool, optional
True if method should return trees calculculated in producing the utility
Returns
-------
ndarray or tuple
tuple of `BaseStorageTree` if return_trees else ndarray with utility at period 0
Examples
---------
Assuming we have declared a EZUtility object as 'ezu' and have a mitigation array 'm'
>>> ezu.adjusted_utility(m, final_cons_eps=0.1)
array([ 9.83424045])
>>> utility_tree, cons_tree, cost_tree, ce_tree = ezu.adjusted_utility(m, final_cons_eps=0.1, return_trees=True)
>>> arr = np.zeros(int(ezu.decision_times[-1]/ezu.period_len) + 1)
>>> arr[-1] = 0.1
>>> ezu.adjusted_utility(m, period_cons_eps=arr)
array([ 9.83424045])
>>> bst = BigStorageTree(5.0, [0, 15, 45, 85, 185, 285, 385])
>>> bst.set_value(bst.last_period, np.repeat(0.01, len(bst.last)))
>>> ezu.adjusted_utility(m, node_cons_eps=bst)
array([ 9.83391921])
The last example differs from the rest in that the last values of the `node_cons_eps` will never be
used. Hence if you want to update the last period consumption, use one of these two methods.
>>> ezu.adjusted_utility(m, first_period_consadj=0.01)
array([ 9.84518772])
"""
utility_tree = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
cons_tree = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
ce_tree = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
cost_tree = SmallStorageTree(decision_times=self.decision_times)
periods = utility_tree.periods[::-1]
if period_cons_eps is None:
period_cons_eps = np.zeros(len(periods))
if node_cons_eps is None:
node_cons_eps = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
self._end_period_utility(m, utility_tree, cons_tree, cost_tree)
it = self._utility_generator(m, utility_tree, cons_tree, cost_tree, ce_tree, first_period_consadj)
i = len(utility_tree)-2
for u, period in it:
if period == periods[1]:
mu_0 = (1.0-self.b) * (u/cons_tree[period])**(1.0-self.r)
next_term = self.b * (1.0-self.b) / (1.0-self.b*self.growth_term**self.r)
mu_1 = (u**(1.0-self.r)) * next_term * (cons_tree.last**(self.r-1.0))
u += (final_cons_eps+period_cons_eps[-1]+node_cons_eps.last) * mu_1
u += (period_cons_eps[i]+node_cons_eps.tree[period]) * mu_0
utility_tree.set_value(period, u)
else:
mu_0, m_1, m_2 = self._period_marginal_utility(mu_0, mu_1, m, period, utility_tree, cons_tree, ce_tree)
u += (period_cons_eps[i] + node_cons_eps.tree[period])*mu_0
utility_tree.set_value(period, u)
i -= 1
if return_trees:
return utility_tree, cons_tree, cost_tree, ce_tree
return utility_tree.tree[0]
def _mu_0(self, cons, ce_term):
"""Marginal utility with respect to consumption function."""
t1 = (1.0 - self.b)*cons**(self.r-1.0)
t2 = (ce_term - (self.b-1.0)*cons**self.r)**((1.0/self.r)-1.0)
return t1 * t2
def _mu_1(self, cons, prob, cons_1, cons_2, ce_1, ce_2, do_print=False):
""" marginal utility with respect to consumption next period."""
t1 = (1.0-self.b) * self.b * prob * cons_1**(self.r-1.0)
t2 = (ce_1 - (self.b-1.0) * cons_1**self.r )**((self.a/self.r)-1)
t3 = (prob * (ce_1 - (self.b*(cons_1**self.r)) + cons_1**self.r)**(self.a/self.r) \
+ (1.0-prob) * (ce_2 - (self.b-1.0) * cons_2**self.r)**(self.a/self.r))**((self.r/self.a)-1.0)
t4 = prob * (ce_1-self.b * (cons_1**self.r) + cons_1**self.r)**(self.a/self.r) \
+ (1.0-prob) * (ce_2 - self.b * (cons_2**self.r) + cons_2**self.r)**(self.a/self.r)
t5 = (self.b * t4**(self.r/self.a) - (self.b-1.0) * cons**self.r )**((1.0/self.r)-1.0)
return t1 * t2 * t3 * t5
def _mu_2(self, cons, prev_cons, ce_term):
"""Marginal utility with respect to last period consumption."""
t1 = (1.0-self.b) * self.b * prev_cons**(self.r-1.0)
t2 = ((1.0 - self.b) * cons**self.r - (self.b - 1.0) * self.b \
* prev_cons**self.r + self.b * ce_term)**((1.0/self.r)-1.0)
return t1 * t2
def _period_marginal_utility(self, prev_mu_0, prev_mu_1, m, period, utility_tree, cons_tree, ce_tree):
"""Marginal utility for each node in a period."""
damage_period = utility_tree.between_decision_times(period)
mu_0 = self._mu_0(cons_tree[period], ce_tree[period])
prev_ce = ce_tree.get_next_period_array(period)
prev_cons = cons_tree.get_next_period_array(period)
if utility_tree.is_information_period(period):
probs = self.tree.get_probs_in_period(damage_period+1)
up_prob = np.array([probs[i]/(probs[i]+probs[i+1]) for i in range(0, len(probs), 2)])
down_prob = 1.0 - up_prob
up_cons = prev_cons[::2]
down_cons = prev_cons[1::2]
up_ce = prev_ce[::2]
down_ce = prev_ce[1::2]
mu_1 = self._mu_1(cons_tree[period], up_prob, up_cons, down_cons, up_ce, down_ce)
mu_2 = self._mu_1(cons_tree[period], down_prob, down_cons, up_cons, down_ce, up_ce)
return mu_0, mu_1, mu_2
else:
mu_1 = self._mu_2(cons_tree[period], prev_cons, prev_ce)
return mu_0, mu_1, None
[docs] def marginal_utility(self, m, utility_tree, cons_tree, cost_tree, ce_tree):
"""Calculating marginal utility for sensitivity analysis, e.g. in the SSC decomposition.
Parameters
----------
m : ndarray
array of mitigations
utility_tree : `BigStorageTree` object
utility values from using mitigation `m`
cons_tree : `BigStorageTree` object
consumption values from using mitigation `m`
cost_tree : `SmallStorageTree` object
cost values from using mitigation `m`
ce_tree : `BigStorageTree` object
certain equivalence values from using mitigation `m`
Returns
-------
tuple
marginal utility tree
Examples
--------
Assuming we have declared a EZUtility object as 'ezu' and have a mitigation array 'm'.
>>>
>>> utility_tree, cons_tree, cost_tree, ce_tree = ezu.utility(m, return_trees=True)
>>> mu_0_tree, mu_1_tree, mu_2_tree = ezu.marginal_utility(m, utility_tree, cons_tree, cost_tree, ce_tree)
>>> mu_0_tree[0] # value at period 0
array([ 0.33001256])
>>> mu_1_tree[0] # value at period 0
array([ 0.15691619])
>>> mu_2_tree[0] # value at period 0
array([ 0.13948175])
"""
mu_tree_0 = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
mu_tree_1 = BigStorageTree(subinterval_len=self.period_len, decision_times=self.decision_times)
mu_tree_2 = SmallStorageTree(decision_times=self.decision_times)
self._end_period_marginal_utility(mu_tree_0, mu_tree_1, ce_tree, utility_tree, cons_tree)
periods = utility_tree.periods[::-1]
for period in periods[2:]:
mu_0, mu_1, mu_2 = self._period_marginal_utility(mu_tree_0.get_next_period_array(period),
mu_tree_1.get_next_period_array(period), m, period, utility_tree, cons_tree, ce_tree)
mu_tree_0.set_value(period, mu_0)
mu_tree_1.set_value(period, mu_1)
if mu_2 is not None:
mu_tree_2.set_value(period, mu_2)
return mu_tree_0, mu_tree_1, mu_tree_2
[docs] def partial_grad(self, m, i, delta=1e-8):
"""Calculate the ith element of the gradient vector.
Parameters
----------
m : ndarray
array of mitigations
i : int
node to calculate partial grad for
Returns
-------
float
gradient element
"""
m_copy = m.copy()
m_copy[i] -= delta
minus_utility = self.utility(m_copy)
m_copy[i] += 2*delta
plus_utility = self.utility(m_copy)
grad = (plus_utility-minus_utility) / (2*delta)
return grad