I'm trying to improve my algorithm to make it faster. Also might help to have a few sets of eyes for any bugs or gaps in logic I didn't catch. One thing to note: because of the nature of the algorithm, there will be periods at the beginning and end of the data that are dead zones, where a peak/valley will not be detected.
The jist of the algorithm is to step through overlapping windows of the data, do some linear regression and pick out points by standard deviation. Because of the overlapping feature we can decide how many "angles" a point needs to be detected as a peak for it to be valid. Then we go and only take the highest peak from a group of consecutive peaks.
I have highly stationary data; what I may or may not want to detect as a peak varies, and I wanted the most robust algorithm so I can use something like Bayesian optimization to set the best parameters.
This is more or less how I'd like my algorithm to function for my specific use case; I'm not really looking for feedback on that, more on whether my code translates these goals correctly and if it's the most computationally efficient way to do things.
def get_peak_valley(arr, threshold, window_size, overlap, req_angles):
# validate params
window_size = int(round(window_size))
req_angles = int(round(req_angles))
window_step = int(round(window_size * (1 - overlap)))
if window_step == 0:
window_step = 1
if req_angles == 0:
req_angles = 1
# get all points that classify as a peak/valley
ind = 0
peak_inds, valley_inds = [], []
while ind + window_size <= len(arr):
flattened = detrend(arr[ind:ind + window_size])
std, avg = np.std(flattened), np.mean(flattened)
lower_b = avg - std * threshold
upper_b = avg + std * threshold
for idx, val in enumerate(flattened):
if val < lower_b:
valley_inds.append(idx + ind)
elif val > upper_b:
peak_inds.append(idx + ind)
ind += window_step
# discard points that have counts below the threshold
peak_counts = Counter(peak_inds)
pk_inds = [c for c in peak_counts.keys() if peak_counts[c] >= req_angles]
valley_counts = Counter(valley_inds)
vly_inds = [c for c in valley_counts.keys() if valley_counts[c] >= req_angles]
# initialize iterator to find to best peak/valley for consecutive detections
if len(pk_inds) == 0 or len(vly_inds) == 0:
return pk_inds, vly_inds
if pk_inds[0] < vly_inds[0]:
curr_event = 'peak'
best_val = arr[pk_inds[0]]
else:
curr_event = 'valley'
best_val = arr[vly_inds[0]]
#iterate through points and only carry forward the index that has the highest or lowest value from the current group
best_ind, new_vly_inds, new_pk_inds = 0, [], []
event_inds = sorted(pk_inds + vly_inds)
for x in event_inds:
if x in pk_inds:
is_peak = True
else:
is_peak = False
if is_peak and curr_event == 'valley':
new_vly_inds.append(best_ind)
curr_event = 'peak'
best_val = arr[x]
best_ind = x
continue
if not is_peak and curr_event == 'peak':
new_pk_inds.append(best_ind)
curr_event = 'valley'
best_val = arr[x]
best_ind = x
continue
if is_peak and curr_event == 'peak' and arr[x] > best_val:
best_val = arr[x]
best_ind = x
elif not is_peak and curr_event == 'valley' and arr[x] < best_val:
best_val = arr[x]
best_ind = x
if curr_event == 'valley':
new_vly_inds.append(best_ind)
if curr_event == 'peak':
new_pk_inds.append(best_ind)
return new_pk_inds, new_vly_inds
close_prices? \$\endgroup\$