-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoptimisers.py
More file actions
109 lines (94 loc) · 3.51 KB
/
optimisers.py
File metadata and controls
109 lines (94 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from abc import ABC
from dataclasses import dataclass
import numpy as np
import pandas as pd
@dataclass
class SetPointOptimiser(ABC):
pass
@dataclass
class PeakShave(SetPointOptimiser):
@staticmethod
def cumulative_peak_areas(sorted_arr: np.ndarray):
delta_energy = np.append(np.diff(sorted_arr), 0)
reverse_index = np.array(range(len(sorted_arr) - 1, -1, -1))
delta_area = delta_energy * reverse_index
return np.cumsum(np.flip(delta_area))
@staticmethod
def peak_area_idx(peak_areas, area, max_idx=None):
idx = np.searchsorted(peak_areas, area) - 1
if max_idx:
return min(idx, max_idx)
else:
return idx
@staticmethod
def sub_load_peak_shave_limit(
sorted_df: pd.DataFrame,
area: float,
gross_col: str,
sub_col: str,
balance_col: str,
) -> float:
for i, row in sorted_df.iloc[::-1].iterrows():
trial_threshold = row[gross_col]
exposed_gross = sorted_df[gross_col].values - trial_threshold
exposed_gross = np.where(exposed_gross < 0.0, 0.0, exposed_gross)
exposed_balance = sorted_df[balance_col].values - trial_threshold
exposed_balance = np.where(exposed_balance < 0.0, 0.0, exposed_balance)
exposed_sub = exposed_gross - exposed_balance
exposed_sub_area = sum(exposed_sub)
if exposed_sub_area >= area:
return trial_threshold
return max(0.0, min(sorted_df[sub_col]))
@staticmethod
def peak_shave(demand_arr: np.ndarray, area):
sorted_arr = np.sort(demand_arr)
peak_areas = PeakShave.cumulative_peak_areas(sorted_arr)
index = PeakShave.peak_area_idx(
peak_areas,
area
)
return np.flip(sorted_arr)[index]
@dataclass
class TOUShiftingCalculator(SetPointOptimiser):
@staticmethod
def inverted_arr(arr):
""" Flip array vertically such that troughs become peaks
and the flipped peak is equal to the un-flipped peak
"""
return arr.max() - arr
@staticmethod
def cap_area(arr: np.ndarray):
cap_arr = arr - arr.min()
return cap_arr.sum()
@staticmethod
def cap_height(arr: np.ndarray):
cap_arr = arr - arr.min()
return cap_arr.max()
@staticmethod
def additional_depth(arr: np.ndarray, area_required: float):
width = len(arr)
return area_required / width
@staticmethod
def calculate_setpoint(demand_arr: np.ndarray, area: float):
setpoint = demand_arr.max()
if area:
cap_area = TOUShiftingCalculator.cap_area(demand_arr)
additional_area_required = area - cap_area
if additional_area_required > 0.0:
additional_depth_required = TOUShiftingCalculator.additional_depth(
demand_arr,
additional_area_required
)
total_depth = \
additional_depth_required + TOUShiftingCalculator.cap_height(demand_arr)
setpoint = demand_arr.max() - total_depth
else:
setpoint = PeakShave.peak_shave(demand_arr, area)
return setpoint
@staticmethod
def charge_setpoint(demand_arr: np.ndarray, area: float):
inverted_arr = TOUShiftingCalculator.inverted_arr(demand_arr)
return demand_arr.max() - TOUShiftingCalculator.calculate_setpoint(
inverted_arr,
area
)