from __future__ import division, print_function
import numpy as np
from abc import ABCMeta, abstractmethod
from damage_simulation import DamageSimulation
from forcing import Forcing
[docs]class Damage(object):
"""Abstract damage class for the EZ-Climate model.
Parameters
----------
tree : `TreeModel` object
provides the tree structure used
bau : `BusinessAsUsual` object
business-as-usual scenario of emissions
Attributes
----------
tree : `TreeModel` object
provides the tree structure used
bau : `BusinessAsUsual` object
business-as-usual scenario of emissions
"""
__metaclass__ = ABCMeta
def __init__(self, tree, bau):
self.tree = tree
self.bau = bau
@abstractmethod
[docs] def average_mitigation(self):
"""The average_mitigation function should return a 1D array of the
average mitigation for every node in the period.
"""
pass
@abstractmethod
[docs] def damage_function(self):
"""The damage_function should return a 1D array of the damages for
every node in the period.
"""
pass
[docs]class DLWDamage(Damage):
"""Damage class for the EZ-Climate model. Provides the damages from emissions and mitigation outcomes.
Parameters
----------
tree : `TreeModel` object
provides the tree structure used
bau : `BusinessAsUsual` object
business-as-usual scenario of emissions
cons_growth : float
constant consumption growth rate
ghg_levels : ndarray or list
end GHG levels for each end scenario
Attributes
----------
tree : `TreeModel` object
provides the tree structure used
bau : `BusinessAsUsual` object
business-as-usual scenario of emissions
cons_growth : float
constant consumption growth rate
ghg_levels : ndarray or list
end GHG levels for each end scenario
dnum : int
number of simulated damage paths
d : ndarray
simulated damages
d_rcomb : ndarray
adjusted simulated damages for recombining tree
cum_forcing : ndarray
cumulative forcing interpolation coeffiecients, used to calculate forcing based mitigation
forcing : `Forcing` object
class for calculating cumulative forcing and GHG levels
damage_coefs : ndarray
interpolation coefficients used to calculate damages
"""
def __init__(self, tree, bau, cons_growth, ghg_levels, subinterval_len):
super(DLWDamage, self).__init__(tree, bau)
self.ghg_levels = ghg_levels
if isinstance(self.ghg_levels, list):
self.ghg_levels = np.array(self.ghg_levels)
self.cons_growth = cons_growth
self.dnum = len(ghg_levels)
self.subinterval_len = subinterval_len
self.cum_forcings = None
self.d = None
self.d_rcomb = None
self.emit_pct = None
self.damage_coefs = None
def _recombine_nodes(self):
"""Creating damage coefficients for recombining tree. The state reached by an up-down move is
separate from a down-up move because in general the two paths will lead to different degrees of
mitigation and therefore of GHG level. A 'recombining' tree is one in which the movement from
one state to the next through time is nonetheless such that an up move followed by a down move
leads to the same fragility.
"""
nperiods = self.tree.num_periods
sum_class = np.zeros(nperiods, dtype=int)
new_state = np.zeros([nperiods, self.tree.num_final_states], dtype=int)
temp_prob = self.tree.final_states_prob.copy()
self.d_rcomb = self.d.copy()
for old_state in range(self.tree.num_final_states):
temp = old_state
n = nperiods-2
d_class = 0
while n >= 0:
if temp >= 2**n:
temp -= 2**n
d_class += 1
n -= 1
sum_class[d_class] += 1
new_state[d_class, sum_class[d_class]-1] = old_state
sum_nodes = np.append(0, sum_class.cumsum())
prob_sum = np.array([self.tree.final_states_prob[sum_nodes[i]:sum_nodes[i+1]].sum() for i in range(len(sum_nodes)-1)])
for period in range(nperiods):
for k in range(self.dnum):
d_sum = np.zeros(nperiods)
old_state = 0
for d_class in range(nperiods):
d_sum[d_class] = (self.tree.final_states_prob[old_state:old_state+sum_class[d_class]] \
* self.d_rcomb[k, old_state:old_state+sum_class[d_class], period]).sum()
old_state += sum_class[d_class]
self.tree.final_states_prob[new_state[d_class, 0:sum_class[d_class]]] = temp_prob[0]
for d_class in range(nperiods):
self.d_rcomb[k, new_state[d_class, 0:sum_class[d_class]], period] = d_sum[d_class] / prob_sum[d_class]
self.tree.node_prob[-len(self.tree.final_states_prob):] = self.tree.final_states_prob
for p in range(1,nperiods-1):
nodes = self.tree.get_nodes_in_period(p)
for node in range(nodes[0], nodes[1]+1):
worst_end_state, best_end_state = self.tree.reachable_end_states(node, period=p)
self.tree.node_prob[node] = self.tree.final_states_prob[worst_end_state:best_end_state+1].sum()
def _damage_interpolation(self):
"""Create the interpolation coeffiecients used in `damage_function`."""
if self.d is None:
print("Importing stored damage simulation")
self.import_damages()
self._recombine_nodes()
if self.emit_pct is None:
bau_emission = self.bau.ghg_end - self.bau.ghg_start
self.emit_pct = 1.0 - (self.ghg_levels-self.bau.ghg_start) / bau_emission
self.damage_coefs = np.zeros((self.tree.num_final_states, self.tree.num_periods, self.dnum-1, self.dnum))
amat = np.ones((self.tree.num_periods, self.dnum, self.dnum))
bmat = np.ones((self.tree.num_periods, self.dnum))
self.damage_coefs[:, :, -1, -1] = self.d_rcomb[-1, :, :]
self.damage_coefs[:, :, -1, -2] = (self.d_rcomb[-2, :, :] - self.d_rcomb[-1, :, :]) / self.emit_pct[-2]
amat[:, 0, 0] = 2.0 * self.emit_pct[-2]
amat[:, 1:, 0] = self.emit_pct[:-1]**2
amat[:, 1:, 1] = self.emit_pct[:-1]
amat[:, 0, -1] = 0.0
for state in range(0, self.tree.num_final_states):
bmat[:, 0] = self.damage_coefs[state, :, -1, -2] * self.emit_pct[-2]
bmat[:, 1:] = self.d_rcomb[:-1, state, :].T
self.damage_coefs[state, :, 0] = np.linalg.solve(amat, bmat)
[docs] def import_damages(self, file_name="simulated_damages"):
"""Import saved simulated damages. File must be saved in 'data' directory
inside current working directory. Save imported values in `d`.
Parameters
----------
file_name : str, optional
name of file of saved simulated damages
Raises
------
IOError
If file does not exist.
"""
from tools import import_csv
try:
d = import_csv(file_name, ignore="#", header=False)
except IOError as e:
import sys
print("Could not import simulated damages:\n\t{}".format(e))
sys.exit(0)
n = self.tree.num_final_states
self.d = np.array([d[n*i:n*(i+1)] for i in range(0, self.dnum)])
self._damage_interpolation()
[docs] def damage_simulation(self, draws, peak_temp=9.0, disaster_tail=12.0, tip_on=True,
temp_map=1, temp_dist_params=None, maxh=100.0, save_simulation=True):
"""Initializion and simulation of damages, given by :mod:`ez_climate.DamageSimulation`.
Parameters
----------
draws : int
number of Monte Carlo draws
peak_temp : float, optional
tipping point parameter
disaster_tail : float, optional
curvature of tipping point
tip_on : bool, optional
flag that turns tipping points on or off
temp_map : int, optional
mapping from GHG to temperature
* 0: implies Pindyck displace gamma
* 1: implies Wagner-Weitzman normal
* 2: implies Roe-Baker
* 3: implies user-defined normal
* 4: implies user-defined gamma
temp_dist_params : ndarray or list, optional
if temp_map is either 3 or 4, user needs to define the distribution parameters
maxh : float, optional
time paramter from Pindyck which indicates the time it takes for temp to get half
way to its max value for a given level of ghg
cons_growth : float, optional
yearly growth in consumption
save_simulation : bool, optional
True if simulated values should be save, False otherwise
Returns
-------
ndarray
simulated damages
"""
ds = DamageSimulation(tree=self.tree, ghg_levels=self.ghg_levels, peak_temp=peak_temp,
disaster_tail=disaster_tail, tip_on=tip_on, temp_map=temp_map,
temp_dist_params=temp_dist_params, maxh=maxh, cons_growth=self.cons_growth)
print("Starting damage simulation..")
self.d = ds.simulate(draws, write_to_file = save_simulation)
print("Done!")
self._damage_interpolation()
return self.d
def _forcing_based_mitigation(self, forcing, period):
"""Calculation of mitigation based on forcing up to period. Interpolating between the forcing associated
with the constant degree of mitigation consistent with the damage simulation scenarios.
"""
p = period - 1
if forcing > self.cum_forcings[p][1]:
weight_on_sim2 = (self.cum_forcings[p][2] - forcing) / (self.cum_forcings[p][2] - self.cum_forcings[p][1])
weight_on_sim3 = 0
elif forcing > self.cum_forcings[p][0]:
weight_on_sim2 = (forcing - self.cum_forcings[p][0]) / (self.cum_forcings[p][1] - self.cum_forcings[p][0])
weight_on_sim3 = (self.cum_forcings[p][1] - forcing) / (self.cum_forcings[p][1] - self.cum_forcings[p][0])
else:
weight_on_sim2 = 0
weight_on_sim3 = 1.0 + (self.cum_forcings[p][0] - forcing) / self.cum_forcings[p][0]
return weight_on_sim2 * self.emit_pct[1] + weight_on_sim3*self.emit_pct[0]
def _forcing_init(self):
"""Initialize `Forcing` object and cum_forcings used in calculating the force mitigation up to a node."""
if self.emit_pct is None:
bau_emission = self.bau.ghg_end - self.bau.ghg_start
self.emit_pct = 1.0 - (self.ghg_levels-self.bau.ghg_start) / bau_emission
self.cum_forcings = np.zeros((self.tree.num_periods, self.dnum))
mitigation = np.ones((self.dnum, self.tree.num_decision_nodes)) * self.emit_pct[:, np.newaxis]
for i in range(0, self.dnum):
for n in range(1, self.tree.num_periods+1):
node = self.tree.get_node(n, 0)
self.cum_forcings[n-1, i] = Forcing.forcing_at_node(mitigation[i], node, self.tree,
self.bau, self.subinterval_len)
[docs] def average_mitigation_node(self, m, node, period=None):
"""Calculate the average mitigation until node.
Parameters
----------
m : ndarray or list
array of mitigation
node : int
node for which average mitigation is to be calculated for
period : int, optional
the period the node is in
Returns
-------
float
average mitigation
"""
if period == 0:
return 0
if period is None:
period = self.tree.get_period(node)
state = self.tree.get_state(node, period)
path = self.tree.get_path(node, period)
new_m = m[path[:-1]]
period_len = self.tree.decision_times[1:period+1] - self.tree.decision_times[:period]
bau_emissions = self.bau.emission_by_decisions[:period]
total_emission = np.dot(bau_emissions, period_len)
ave_mitigation = np.dot(new_m, bau_emissions*period_len)
return ave_mitigation / total_emission
[docs] def average_mitigation(self, m, period):
"""Calculate the average mitigation for all node in a period.
m : ndarray or list
array of mitigation
period : int
period to calculate average mitigation for
Returns
-------
ndarray
average mitigations
"""
nodes = self.tree.get_num_nodes_period(period)
ave_mitigation = np.zeros(nodes)
for i in range(nodes):
node = self.tree.get_node(period, i)
ave_mitigation[i] = self.average_mitigation_node(m, node, period)
return ave_mitigation
def _ghg_level_node(self, m, node):
return Forcing.ghg_level_at_node(m, node, self.tree, self.bau, self.subinterval_len)
[docs] def ghg_level_period(self, m, period=None, nodes=None):
"""Calculate the GHG levels corresponding to the given mitigation.
Need to provide either `period` or `nodes`.
Parameters
----------
m : ndarray or list
array of mitigation
period : int, optional
what period to calculate GHG levels for
nodes : ndarray or list, optional
the nodes to calculate GHG levels for
Returns
-------
ndarray
GHG levels
"""
if nodes is None and period is not None:
start_node, end_node = self.tree.get_nodes_in_period(period)
if period >= self.tree.num_periods:
add = end_node-start_node+1
start_node += add
end_node += add
nodes = np.array(range(start_node, end_node+1))
if period is None and nodes is None:
raise ValueError("Need to give function either nodes or the period")
ghg_level = np.zeros(len(nodes))
for i in range(len(nodes)):
ghg_level[i] = self._ghg_level_node(m, nodes[i])
return ghg_level
[docs] def ghg_level(self, m, periods=None):
"""Calculate the GHG levels for more than one period.
Parameters
----------
m : ndarray or list
array of mitigation
periods : int, optional
number of periods to calculate GHG levels for
Returns
-------
ndarray
GHG levels
"""
if periods is None:
periods = self.tree.num_periods-1
if periods >= self.tree.num_periods:
ghg_level = np.zeros(self.tree.num_decision_nodes+self.tree.num_final_states)
else:
ghg_level = np.zeros(self.tree.num_decision_nodes)
for period in range(periods+1):
start_node, end_node = self.tree.get_nodes_in_period(period)
if period >= self.tree.num_periods:
add = end_node-start_node+1
start_node += add
end_node += add
nodes = np.array(range(start_node, end_node+1))
ghg_level[nodes] = self.ghg_level_period(m, nodes=nodes)
return ghg_level
def _damage_function_node(self, m, node):
"""Calculate the damage at any given node, based on mitigation actions in `m`."""
if self.damage_coefs is None:
self._damage_interpolation()
if self.cum_forcings is None:
self._forcing_init()
if node == 0:
return 0.0
period = self.tree.get_period(node)
forcing, ghg_level = Forcing.forcing_and_ghg_at_node(m, node, self.tree, self.bau, self.subinterval_len, "both")
force_mitigation = self._forcing_based_mitigation(forcing, period)
ghg_extension = 1.0 / (1 + np.exp(0.05*(ghg_level-200)))
worst_end_state, best_end_state = self.tree.reachable_end_states(node, period=period)
probs = self.tree.final_states_prob[worst_end_state:best_end_state+1]
if force_mitigation < self.emit_pct[1]:
damage = (probs *(self.damage_coefs[worst_end_state:best_end_state+1, period-1, 1, 1] * force_mitigation \
+ self.damage_coefs[worst_end_state:best_end_state+1, period-1, 1, 2])).sum()
elif force_mitigation < self.emit_pct[0]:
damage = (probs * (self.damage_coefs[worst_end_state:best_end_state+1, period-1, 0, 0] * force_mitigation**2 \
+ self.damage_coefs[worst_end_state:best_end_state+1, period-1, 0, 1] * force_mitigation \
+ self.damage_coefs[worst_end_state:best_end_state+1, period-1, 0, 2])).sum()
else:
damage = 0.0
i = 0
for state in range(worst_end_state, best_end_state+1):
if self.d_rcomb[0, state, period-1] > 1e-5:
deriv = 2.0 * self.damage_coefs[state, period-1, 0, 0]*self.emit_pct[0] \
+ self.damage_coefs[state, period-1, 0, 1]
decay_scale = deriv / (self.d_rcomb[0, state, period-1]*np.log(0.5))
dist = force_mitigation - self.emit_pct[0] + np.log(self.d_rcomb[0, state, period-1]) \
/ (np.log(0.5) * decay_scale)
damage += probs[i] * (0.5**(decay_scale*dist) * np.exp(-np.square(force_mitigation-self.emit_pct[0])/60.0))
i += 1
return (damage / probs.sum()) + ghg_extension
[docs] def damage_function(self, m, period):
"""Calculate the damage for every node in a period, based on mitigation actions `m`.
Parameters
----------
m : ndarray or list
array of mitigation
period : int
period to calculate damages for
Returns
-------
ndarray
damages
"""
nodes = self.tree.get_num_nodes_period(period)
damages = np.zeros(nodes)
for i in range(nodes):
node = self.tree.get_node(period, i)
damages[i] = self._damage_function_node(m, node)
return damages