from __future__ import division, print_function
import numpy as np
from scipy.optimize import brentq
from storage_tree import BigStorageTree, SmallStorageTree
from optimization import GeneticAlgorithm, GradientSearch
from tools import write_columns_csv, append_to_existing, import_csv
[docs]def additional_ghg_emission(m, utility):
"""Calculate the emission added by every node.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
Returns
-------
ndarray
additional emission in nodes
"""
additional_emission = np.zeros(len(m))
cache = set()
for node in range(utility.tree.num_final_states, len(m)):
path = utility.tree.get_path(node)
for i in range(len(path)):
if path[i] not in cache:
additional_emission[path[i]] = (1.0 - m[path[i]]) * utility.damage.bau.emission_to_ghg[i]
cache.add(path[i])
return additional_emission
[docs]def store_trees(prefix=None, start_year=2015, **kwargs):
"""Saves values of `BaseStorageTree` objects. The file is saved into the 'data' directory
in the current working directory. If there is no 'data' directory, one is created.
Parameters
----------
prefix : str, optional
prefix to be added to file_name
start_year : int, optional
start year of analysis
**kwargs
arbitrary keyword arguments of `BaseStorageTree` objects
"""
if prefix is None:
prefix = ""
for name, tree in kwargs.items():
tree.write_columns(prefix + "trees", name, start_year)
[docs]def delta_consumption(m, utility, cons_tree, cost_tree, delta_m):
"""Calculate the changes in consumption and the mitigation cost component
of consumption when increaseing period 0 mitigiation with `delta_m`.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
cons_tree : `BigStorageTree` object
consumption storage tree of consumption values
from optimal mitigation values
cost_tree : `SmallStorageTree` object
cost storage tree of cost values from optimal mitigation values
delta_m : float
value to increase period 0 mitigation by
Returns
-------
tuple
(storage tree of changes in consumption, ndarray of costs in first sub periods)
"""
m_copy = m.copy()
m_copy[0] += delta_m
new_utility_tree, new_cons_tree, new_cost_tree, new_ce_tree = utility.utility(m_copy, return_trees=True)
for period in new_cons_tree.periods:
new_cons_tree.tree[period] = (new_cons_tree.tree[period]-cons_tree.tree[period]) / delta_m
first_period_intervals = new_cons_tree.first_period_intervals
cost_array = np.zeros((first_period_intervals, 2))
for i in range(first_period_intervals):
potential_consumption = (1.0 + utility.cons_growth)**(new_cons_tree.subinterval_len * i)
cost_array[i, 0] = potential_consumption * cost_tree[0]
cost_array[i, 1] = (potential_consumption * new_cost_tree[0] - cost_array[i, 0]) / delta_m
return new_cons_tree, cost_array, new_utility_tree[0]
[docs]def constraint_first_period(utility, first_node, m_size):
"""Calculate the changes in consumption, the mitigation cost component of consumption,
and new mitigation values when constraining the first period mitigation to `first_node`.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
first_node : float
value to constrain first period to
Returns
-------
tuple
(new mitigation array, storage tree of changes in consumption, ndarray of costs in first sub periods)
"""
fixed_values = np.array([first_node])
fixed_indicies = np.array([0])
ga_model = GeneticAlgorithm(pop_amount=150, num_generations=100, cx_prob=0.8, mut_prob=0.5, bound=1.5,
num_feature=m_size, utility=utility, fixed_values=fixed_values,
fixed_indicies=fixed_indicies, print_progress=True)
gs_model = GradientSearch(var_nums=m_size, utility=utility, accuracy=1e-7,
iterations=250, fixed_values=fixed_values, fixed_indicies=fixed_indicies,
print_progress=True)
final_pop, fitness = ga_model.run()
sort_pop = final_pop[np.argsort(fitness)][::-1]
new_m, new_utility = gs_model.run(initial_point_list=sort_pop, topk=1)
return new_m
[docs]def find_ir(m, utility, payment, a=0.0, b=1.0):
"""Find the price of a bond that creates equal utility at time 0 as adding `payment` to the value of
consumption in the final period. The purpose of this function is to find the interest rate
embedded in the `EZUtility` model.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
payment : float
value added to consumption in the final period
a : float, optional
initial guess
b : float, optional
initial guess - f(b) needs to give different sign than f(a)
Returns
-------
tuple
result of optimization
Note
----
requires the 'scipy' package
"""
def min_func(price):
utility_with_final_payment = utility.adjusted_utility(m, final_cons_eps=payment)
first_period_eps = payment * price
utility_with_initial_payment = utility.adjusted_utility(m, first_period_consadj=first_period_eps)
return utility_with_final_payment - utility_with_initial_payment
return brentq(min_func, a, b)
[docs]def find_term_structure(m, utility, payment, a=0.0, b=1.5):
"""Find the price of a bond that creates equal utility at time 0 as adding `payment` to the value of
consumption in the final period. The purpose of this function is to find the interest rate
embedded in the `EZUtility` model.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
payment : float
value added to consumption in the final period
a : float, optional
initial guess
b : float, optional
initial guess - f(b) needs to give different sign than f(a)
Returns
-------
tuple
result of optimization
Note
----
requires the 'scipy' package
"""
def min_func(price):
period_cons_eps = np.zeros(int(utility.decision_times[-1]/utility.period_len) + 1)
period_cons_eps[-2] = payment
utility_with_payment = utility.adjusted_utility(m, period_cons_eps=period_cons_eps)
first_period_eps = payment * price
utility_with_initial_payment = utility.adjusted_utility(m, first_period_consadj=first_period_eps)
return utility_with_payment - utility_with_initial_payment
return brentq(min_func, a, b)
[docs]def find_bec(m, utility, constraint_cost, a=-0.1, b=1.5):
"""Used to find a value for consumption that equalizes utility at time 0 in two different solutions.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
constraint_cost : float
utility cost of constraining period 0 to zero
a : float, optional
initial guess
b : float, optional
initial guess - f(b) needs to give different sign than f(a)
Returns
-------
tuple
result of optimization
Note
----
requires the 'scipy' package
"""
def min_func(delta_con):
base_utility = utility.utility(m)
new_utility = utility.adjusted_utility(m, first_period_consadj=delta_con)
print(base_utility, new_utility, constraint_cost)
return new_utility - base_utility - constraint_cost
return brentq(min_func, a, b)
[docs]def perpetuity_yield(price, start_date, a=0.1, b=10.0):
"""Find the yield of a perpetuity starting at year `start_date`.
Parameters
----------
price : float
price of bond ending at `start_date`
start_date : int
start year of perpetuity
a : float, optional
initial guess
b : float, optional
initial guess - f(b) needs to give different sign than f(a)
Returns
-------
tuple
result of optimization
Note
----
requires the 'scipy' package
"""
def min_func(perp_yield):
return price - (100. / (perp_yield+100.))**start_date * (perp_yield + 100)/perp_yield
return brentq(min_func, a, b)
[docs]class ClimateOutput(object):
"""Calculate and save output from the EZ-Climate model.
Parameters
----------
utility : `Utility` object
object of utility class
Attributes
----------
utility : `Utility` object
object of utility class
prices : ndarray
SCC prices
ave_mitigations : ndarray
average mitigations
ave_emissions : ndarray
average emissions
expected_period_price : ndarray
expected SCC for the period
expected_period_mitigation : ndarray
expected mitigation for the period
expected_period_emissions : ndarray
expected emission for the period
"""
def __init__(self, utility):
self.utility = utility
self.prices = None
self.ave_mitigations = None
self.ave_emissions = None
self.expected_period_price = None
self.expected_period_mitigation = None
self.expected_period_emissions = None
self.ghg_levels = None
[docs] def calculate_output(self, m):
"""Calculated values based on optimal mitigation. For every **node** the function calculates and saves
* average mitigation
* average emission
* GHG level
* SCC
as attributes.
For every **period** the function also calculates and saves
* expected SCC/price
* expected mitigation
* expected emission
as attributes.
Parameters
----------
m : ndarray or list
array of mitigation
"""
bau = self.utility.damage.bau
tree = self.utility.tree
periods = tree.num_periods
self.prices = np.zeros(len(m))
self.ave_mitigations = np.zeros(len(m))
self.ave_emissions = np.zeros(len(m))
self.expected_period_price = np.zeros(periods)
self.expected_period_mitigation = np.zeros(periods)
self.expected_period_emissions = np.zeros(periods)
additional_emissions = additional_ghg_emission(m, self.utility)
self.ghg_levels = self.utility.damage.ghg_level(m)
for period in range(0, periods):
years = tree.decision_times[period]
period_years = tree.decision_times[period+1] - tree.decision_times[period]
nodes = tree.get_nodes_in_period(period)
num_nodes_period = 1 + nodes[1] - nodes[0]
period_lens = tree.decision_times[:period+1]
for node in range(nodes[0], nodes[1]+1):
path = np.array(tree.get_path(node, period))
new_m = m[path]
mean_mitigation = np.dot(new_m, period_lens) / years
price = self.utility.cost.price(years, m[node], mean_mitigation)
self.prices[node] = price
self.ave_mitigations[node] = self.utility.damage.average_mitigation_node(m, node, period)
self.ave_emissions[node] = additional_emissions[node] / (period_years*bau.emission_to_bau)
probs = tree.get_probs_in_period(period)
self.expected_period_price[period] = np.dot(self.prices[nodes[0]:nodes[1]+1], probs)
self.expected_period_mitigation[period] = np.dot(self.ave_mitigations[nodes[0]:nodes[1]+1], probs)
self.expected_period_emissions[period] = np.dot(self.ave_emissions[nodes[0]:nodes[1]+1], probs)
[docs] def save_output(self, m, prefix=None):
"""Function to save calculated values in `calculate_output` in the file `prefix` + 'node_period_output'
in the 'data' directory in the current working directory.
The function also saves the values calculated in the utility function in the file
`prefix` + 'tree' in the 'data' directory in the current working directory.
If there is no 'data' directory, one is created.
Parameters
----------
m : ndarray or list
array of mitigation
prefix : str, optional
prefix to be added to file_name
"""
utility_tree, cons_tree, cost_tree, ce_tree = self.utility.utility(m, return_trees=True)
if prefix is not None:
prefix += "_"
else:
prefix = ""
write_columns_csv([m, self.prices, self.ave_mitigations, self.ave_emissions, self.ghg_levels],
prefix+"node_period_output", ["Node", "Mitigation", "Prices", "Average Mitigation",
"Average Emission", "GHG Level"], [range(len(m))])
append_to_existing([self.expected_period_price, self.expected_period_mitigation, self.expected_period_emissions],
prefix+"node_period_output", header=["Period", "Expected Price", "Expected Mitigation",
"Expected Emission"], index=[range(self.utility.tree.num_periods)], start_char='\n')
store_trees(prefix=prefix, Utility=utility_tree, Consumption=cons_tree,
Cost=cost_tree, CertainEquivalence=ce_tree)
[docs]class RiskDecomposition(object):
"""Calculate and save analysis of output from the EZ-Climate model.
Parameters
----------
utility : `Utility` object
object of utility class
Attributes
----------
utility : `Utility` object
object of utility class
sdf_tree : `BaseStorageTree` object
SDF for each node
expected_damages : ndarray
expected damages in each period
risk_premium : ndarray
risk premium in each period
expected_sdf : ndarray
expected SDF in each period
cross_sdf_damages : ndarray
cross term between the SDF and damages
discounted_expected_damages : ndarray
expected discounted damages for each period
net_discount_damages : ndarray
net discount damage, i.e. when cost is also accounted for
cov_term : ndarray
covariance between SDF and damages
"""
def __init__(self, utility):
self.utility = utility
self.sdf_tree = BigStorageTree(utility.period_len, utility.decision_times)
self.sdf_tree.set_value(0, np.array([1.0]))
n = len(self.sdf_tree)
self.expected_damages = np.zeros(n)
self.risk_premiums = np.zeros(n)
self.expected_sdf = np.zeros(n)
self.cross_sdf_damages = np.zeros(n)
self.discounted_expected_damages = np.zeros(n)
self.net_discount_damages = np.zeros(n)
self.cov_term = np.zeros(n)
self.expected_sdf[0] = 1.0
[docs] def sensitivity_analysis(self, m):
"""Calculate sensitivity analysis based on the optimal mitigation. For every sub-period, i.e. the
periods given by the utility calculations, the function calculates and saves:
* discount prices
* net expected damages
* expected damages
* discounted expected damages
* risk premium
* cross SDF & damages
* covariance between SDF and damages
as attributes.
Parameters
----------
m : ndarray or list
array of mitigation
utility : `Utility` object
object of utility class
prefix : str, optional
prefix to be added to file_name
"""
utility_tree, cons_tree, cost_tree, ce_tree = self.utility.utility(m, return_trees=True)
cost_sum = 0
self.delta_cons_tree, self.delta_cost_array, delta_utility = delta_consumption(m, self.utility, cons_tree, cost_tree, 0.01)
mu_0, mu_1, mu_2 = self.utility.marginal_utility(m, utility_tree, cons_tree, cost_tree, ce_tree)
sub_len = self.sdf_tree.subinterval_len
i = 1
for period in self.sdf_tree.periods[1:]:
node_period = self.sdf_tree.decision_interval(period)
period_probs = self.utility.tree.get_probs_in_period(node_period)
expected_damage = np.dot(self.delta_cons_tree[period], period_probs)
self.expected_damages[i] = expected_damage
if self.sdf_tree.is_information_period(period-self.sdf_tree.subinterval_len):
total_probs = period_probs[::2] + period_probs[1::2]
mu_temp = np.zeros(2*len(mu_1[period-sub_len]))
mu_temp[::2] = mu_1[period-sub_len]
mu_temp[1::2] = mu_2[period-sub_len]
sdf = (np.repeat(total_probs, 2) / period_probs) * (mu_temp/np.repeat(mu_0[period-sub_len], 2))
period_sdf = np.repeat(self.sdf_tree.tree[period-sub_len],2)*sdf
else:
sdf = mu_1[period-sub_len]/mu_0[period-sub_len]
period_sdf = self.sdf_tree[period-sub_len]*sdf
self.expected_sdf[i] = np.dot(period_sdf, period_probs)
self.cross_sdf_damages[i] = np.dot(period_sdf, self.delta_cons_tree[period]*period_probs)
self.cov_term[i] = self.cross_sdf_damages[i] - self.expected_sdf[i]*expected_damage
self.sdf_tree.set_value(period, period_sdf)
if i < len(self.delta_cost_array):
self.net_discount_damages[i] = -(expected_damage + self.delta_cost_array[i, 1]) * self.expected_sdf[i] / self.delta_cons_tree[0]
cost_sum += -self.delta_cost_array[i, 1] * self.expected_sdf[i] / self.delta_cons_tree[0]
else:
self.net_discount_damages[i] = -expected_damage * self.expected_sdf[i] / self.delta_cons_tree[0]
self.risk_premiums[i] = -self.cov_term[i]/self.delta_cons_tree[0]
self.discounted_expected_damages[i] = -expected_damage * self.expected_sdf[i] / self.delta_cons_tree[0]
i += 1
[docs] def save_output(self, m, prefix=None):
"""Save attributes calculated in `sensitivity_analysis` into the file prefix + `sensitivity_output`
in the `data` directory in the current working directory.
Furthermore, the perpetuity yield, the discount factor for the last period is calculated, and SCC,
expected damage and risk premium for the first period is calculated and saved in into the file
prefix + `tree` in the `data` directory in the current working directory. If there is no `data` directory,
one is created.
Parameters
----------
m : ndarray or list
array of mitigation
prefix : str, optional
prefix to be added to file_name
"""
end_price = find_term_structure(m, self.utility, 0.01)
perp_yield = perpetuity_yield(end_price, self.sdf_tree.periods[-2])
damage_scale = self.utility.cost.price(0, m[0], 0) / (self.net_discount_damages.sum()+self.risk_premiums.sum())
scaled_discounted_ed = self.net_discount_damages * damage_scale
scaled_risk_premiums = self.risk_premiums * damage_scale
if prefix is not None:
prefix += "_"
else:
prefix = ""
write_columns_csv([self.expected_sdf, self.net_discount_damages, self.expected_damages, self.risk_premiums,
self.cross_sdf_damages, self.discounted_expected_damages, self.cov_term,
scaled_discounted_ed, scaled_risk_premiums], prefix + "sensitivity_output",
["Year", "Discount Prices", "Net Expected Damages", "Expected Damages", "Risk Premium",
"Cross SDF & Damages", "Discounted Expected Damages", "Cov Term", "Scaled Net Expected Damages",
"Scaled Risk Premiums"], [self.sdf_tree.periods.astype(int)+2015])
append_to_existing([[end_price], [perp_yield], [scaled_discounted_ed.sum()], [scaled_risk_premiums.sum()],
[self.utility.cost.price(0, m[0], 0)]], prefix+"sensitivity_output",
header=["Zero Bound Price", "Perp Yield", "Expected Damages", "Risk Premium",
"SCC"], start_char='\n')
store_trees(prefix=prefix, SDF=self.sdf_tree, DeltaConsumption=self.delta_cons_tree)
[docs]class ConstraintAnalysis(object):
def __init__(self, run_name, utility, const_value, opt_m=None):
self.run_name = run_name
self.utility = utility
self.cfp_m = constraint_first_period(utility, const_value, utility.tree.num_decision_nodes)
self.opt_m = opt_m
if self.opt_m is None:
self.opt_m = self._get_optimal_m()
self.con_cost = self._constraint_cost()
self.delta_u = self._first_period_delta_udiff()
self.delta_c = self._delta_consumption()
self.delta_c_billions = self.delta_c * self.utility.cost.cons_per_ton \
* self.utility.damage.bau.emit_level[0]
self.delta_emission_gton = self.opt_m[0]*self.utility.damage.bau.emit_level[0]
self.deadweight = self.delta_c*self.utility.cost.cons_per_ton / self.opt_m[0]
self.delta_u2 = self._first_period_delta_udiff2()
self.marginal_benefit = (self.delta_u2 / self.delta_u) * self.utility.cost.cons_per_ton
self.marginal_cost = self.utility.cost.price(0, self.cfp_m[0], 0)
def _get_optimal_m(self):
try:
header, index, data = import_csv(self.run_name+"_node_period_output")
except:
print("No such file for the optimal mitigation..")
return data[:, 0]
def _constraint_cost(self):
opt_u = self.utility.utility(self.opt_m)
cfp_u = self.utility.utility(self.cfp_m)
return opt_u - cfp_u
def _delta_consumption(self):
return find_bec(self.cfp_m, self.utility, self.con_cost)
def _first_period_delta_udiff(self):
u_given_delta_con = self.utility.adjusted_utility(self.cfp_m, first_period_consadj=0.01)
cfp_u = self.utility.utility(self.cfp_m)
return u_given_delta_con - cfp_u
def _first_period_delta_udiff2(self):
m = self.cfp_m.copy()
m[0] += 0.01
u = self.utility.utility(m)
cfp_u = self.utility.utility(self.cfp_m)
return u - cfp_u
[docs] def save_output(self, prefix=None):
if prefix is not None:
prefix += "_"
else:
prefix = ""
write_columns_csv([self.con_cost, [self.delta_c], [self.delta_c_billions], [self.delta_emission_gton],
[self.deadweight], self.delta_u, self.marginal_benefit, [self.marginal_cost]],
prefix + self.run_name + "_constraint_output",
["Constraint Cost", "Delta Consumption", "Delta Consumption $b",
"Delta Emission Gton", "Deadweight Cost", "Marginal Impact Utility",
"Marginal Benefit Emissions Reduction", "Marginal Cost Emission Reduction"])