Source code for ezclimate.storage_tree

from __future__ import division
import numpy as np
from abc import ABCMeta, abstractmethod

[docs]class BaseStorageTree(object): """Abstract storage class for the EZ-Climate model. Parameters ---------- decision_times : ndarray or list array of years from start where decisions about mitigation levels are done Attributes ---------- decision_times : ndarray array of years from start where decisions about mitigation levels are done information_times : ndarray array of years where new information is given to the agent in the model periods : ndarray periods in the tree tree : dict dictionary where keys are `periods` and values are nodes in period """ __metaclass__ = ABCMeta def __init__(self, decision_times): self.decision_times = decision_times if isinstance(decision_times, list): self.decision_times = np.array(decision_times) self.information_times = self.decision_times[:-2] self.periods = None self.tree = None def __len__(self): return len(self.tree)
[docs] def __getitem__(self, key): if isinstance(key, int) or isinstance(key, float): return self.tree.__getitem__(key).copy() else: raise TypeError('Index must be int, not {}'.format(type(key).__name__))
def _init_tree(self): self.tree = dict.fromkeys(self.periods) i = 0 for key in self.periods: self.tree[key] = np.zeros(2**i) if key in self.information_times: i += 1 @property def last(self): """ndarray: last period's array.""" return self.tree[self.decision_times[-1]] @property def last_period(self): """int: index of last period.""" return self.decision_times[-1] @property def nodes(self): """int: number of nodes in the tree.""" n = 0 for array in self.tree.values(): n += len(array) return n @abstractmethod
[docs] def get_next_period_array(self, period): """Return the array of the next period from `periods`.""" pass
[docs] def set_value(self, period, values): """If period is in periods, set the value of element to `values` (ndarray).""" if period not in self.periods: raise ValueError("Not a valid period") if isinstance(values, list): values = np.array(values) if self.tree[period].shape != values.shape: raise ValueError("shapes {} and {} not aligned".format(self.tree[period].shape, values.shape)) self.tree[period] = values
[docs] def is_decision_period(self, time_period): """Checks if time_period is a decision time for mitigation, where time_period is the number of years since start. Parameters ---------- time_period : int time since the start year of the model Returns ------- bool True if time_period also is a decision time, else False """ return time_period in self.decision_times
[docs] def is_real_decision_period(self, time_period): """Checks if time_period is a decision time besides the last period, where time_period is the number of years since start. Parameters ---------- time_period : int time since the start year of the model Returns ------- bool True if time_period also is a real decision time, else False """ return time_period in self.decision_times[:-1]
[docs] def is_information_period(self, time_period): """Checks if time_period is a information time for fragility, where time_period is the number of years since start. Parameters ---------- time_period : int time since the start year of the model Returns ------- bool True if time_period also is an information time, else False """ return time_period in self.information_times
[docs] def write_tree(self, file_name, header, delimiter=";"): """Save values in `tree` as a tree into file `file_name` in the 'data' directory in the current working directory. If there is no 'data' directory, one is created. Parameters ---------- file_name : str name of saved file header : str first row of file delimiter : str, optional delimiter in file """ from tools import find_path import csv real_times = self.decision_times[:-1] size = len(self.tree[real_times[-1]]) output_lst = [] prev_k = size for t in real_times: temp_lst = [""]*(size*2) k = int(size/len(self.tree[t])) temp_lst[k::prev_k] = self.tree[t].tolist() output_lst.append(temp_lst) prev_k = k write_lst = zip(*output_lst) d = find_path(file_name) with open(d, 'wb') as f: writer = csv.writer(f, delimiter=delimiter) writer.writerow([header]) for row in write_lst: writer.writerow(row)
[docs] def write_columns(self, file_name, header, start_year=2015, delimiter=";"): """Save values in `tree` as columns into file `file_name` in the 'data' directory in the current working directory. If there is no 'data' directory, one is created. +------------+------------+-----------+ | Year | Node | header | +============+============+===========+ | start_year | 0 | val0 | +------------+------------+-----------+ | .. | .. | .. | +------------+------------+-----------+ Parameters ---------- file_name : str name of saved file header : str description of values in tree start_year : int, optional start year of analysis delimiter : str, optional delimiter in file """ from tools import write_columns_csv, file_exists if file_exists(file_name): self.write_columns_existing(file_name, header) else: real_times = self.decision_times[:-1] years = [] nodes = [] output_lst = [] k = 0 for t in real_times: for n in range(len(self.tree[t])): years.append(t+start_year) nodes.append(k) output_lst.append(self.tree[t][n]) k += 1 write_columns_csv(lst=[output_lst], file_name=file_name, header=["Year", "Node", header], index=[years, nodes], delimiter=delimiter)
[docs] def write_columns_existing(self, file_name, header, delimiter=";"): """Save values in `tree` as columns into file `file_name` in the 'data' directory in the current working directory, when `file_name` already exists. If there is no 'data' directory, one is created. +------------+------------+-----------------+------------------+ | Year | Node | other_header | header | +============+============+=================+==================+ | start_year | 0 | other_val0 | val0 | +------------+------------+-----------------+------------------+ | .. | .. | .. | .. | +------------+------------+-----------------+------------------+ Parameters ---------- file_name : str name of saved file header : str description of values in tree start_year : int, optional start year of analysis delimiter : str, optional delimiter in file """ from tools import write_columns_to_existing output_lst = [] for t in self.decision_times[:-1]: output_lst.extend(self.tree[t]) write_columns_to_existing(lst=output_lst, file_name=file_name, header=header)
[docs]class SmallStorageTree(BaseStorageTree): """Storage tree class for the EZ-Climate model. No storage in nodes between periods in `decision_times`. Parameters ---------- decision_times : ndarray or list array of years from start where decisions about mitigation levels are done Attributes ---------- decision_times : ndarray array of years from start where decisions about mitigation levels are done information_times : ndarray array of years where new information is given to the agent in the model periods : ndarray periods in the tree tree : dict dictionary where keys are `periods` and values are nodes in period """ def __init__(self, decision_times): super(SmallStorageTree, self).__init__(decision_times) self.periods = self.decision_times self._init_tree()
[docs] def get_next_period_array(self, period): """Returns the array of the next decision period. Parameters ---------- period : int period Examples -------- >>> sst = SmallStorageTree([0, 15, 45, 85, 185, 285, 385]) >>> sst.get_next_period_array(0) array([0., 0.]) >>> sst.get_next_period_array(15) array([ 0., 0., 0., 0.]) Raises ------ IndexError If `period` is not in real decision times """ if self.is_real_decision_period(period): index = self.decision_times[np.where(self.decision_times==period)[0]+1][0] return self.tree[index].copy() raise IndexError("Given period is not in real decision times")
[docs] def index_below(self, period): """Returns the key of the previous decision period. Parameters ---------- period : int period Examples -------- >>> sst = SmallStorageTree([0, 15, 45, 85, 185, 285, 385]) >>> sst.index_below(15) 0 Raises ------ IndexError If `period` is not in decision times or first element in decision times """ if period in self.decision_times[1:]: period = self.decision_times[np.where(self.decision_times==period)[0]-1] return period[0] raise IndexError("Period not in decision times or first period")
[docs]class BigStorageTree(BaseStorageTree): """Storage tree class for the EZ-Climate model. Storage in nodes between periods in `decision_times`. Parameters ---------- subintervals_len : float years between periods in tree decision_times : ndarray or list array of years from start where decisions about mitigation levels are done Attributes ---------- decision_times : ndarray array of years from start where decisions about mitigation levels are done information_times : ndarray array of years where new information is given to the agent in the model periods : ndarray periods in the tree tree : dict dictionary where keys are `periods` and values are nodes in period subintervals_len : float years between periods in tree """ def __init__(self, subinterval_len, decision_times): super(BigStorageTree, self).__init__(decision_times) self.subinterval_len = subinterval_len self.periods = np.arange(0, self.decision_times[-1]+self.subinterval_len, self.subinterval_len) self._init_tree() @property def first_period_intervals(self): """ndarray: the number of subintervals in the first period.""" return int((self.decision_times[1] - self.decision_times[0]) / self.subinterval_len)
[docs] def get_next_period_array(self, period): """Returns the array of the next period. Parameters ---------- period : int period Examples -------- >>> bst = BigStorageTree(5.0, [0, 15, 45, 85, 185, 285, 385]) >>> sst.get_next_period_array(0) array([0., 0.]) >>> sst.get_next_period_array(10) array([ 0., 0., 0., 0.]) Raises ------ IndexError If `period` is not a valid period or too large """ if period + self.subinterval_len <= self.decision_times[-1]: return self.tree[period+self.subinterval_len].copy() raise IndexError("Period is not a valid period or too large")
[docs] def between_decision_times(self, period): """Check which decision time the period is between and returns the index of the lower decision time. Parameters ---------- period : int period Returns ------- int index Examples -------- >>> bst = BigStorageTree(5, [0, 15, 45, 85, 185, 285, 385]) >>> bst.between_decision_times(5) 0 >>> bst.between_decision_times(15) 1 """ if period == 0: return 0 for i in range(len(self.information_times)): if self.decision_times[i] <= period and period < self.decision_times[i+1]: return i return i+1
[docs] def decision_interval(self, period): """Check which interval the period is between. Parameters ---------- period : int period Returns ------- int index Examples -------- >>> bst = BigStorageTree(5, [0, 15, 45, 85, 185, 285, 385]) >>> bst.decision_interval(5) 1 >>> bst.between_decision_times(15) 1 >>> bst.between_decision_times(20) 2 """ if period == 0: return 0 for i in range(1, len(self.decision_times)): if self.decision_times[i-1] < period and period <= self.decision_times[i]: return i return i