Hardware: Raspberry Pi 3B+, Elecrow touchscreen, DFR0660 Barcode Scanner, ADS1115 ADC
I would like any bad practices/possible failure points pointed out specifically in the loop(), read_adc(), and catheter_test() functions. The write_to_report() and measure_single_catheter() functions were removed. The system is fully functional.
import sys
from time import sleep
import RPi.GPIO as GPIO
from os import system
from datetime import datetime
from fpdf import FPDF
from fpdf.enums import XPos, YPos
import pyautogui
import pygame
from itertools import chain
import subprocess
from hw_init import *
from cath_test_init import *
from gui_init import *
sys.path.append('/usr/lib/python39.zip')
# !/usr/bin python3
# VERBOSE LOGGING FOR TROUBLESHOOTING
if sys.argv[1] == 'log':
logfile_date = datetime.now().strftime("%m_%d_%Y")
logfile_name = 'logfile' + logfile_date + '.txt'
sys.stdout = open(logfile_name, 'w')
def log_print(string):
logfile_now = datetime.now().strftime("%m_%d_%Y_%H:%M:%S")
print(logfile_now + "\t" + string)
class PDF(FPDF):
# Page footer
def footer(self):
self.set_y(-40)
self.set_font('Times', 'I', 12)
# Page number {nb} comes from alias_nb_page
self.cell(0, 10, 'Model Selected: ' + MODEL_SELECTED +
' Sofware Version:' + SOFTWARE_VERSION,
new_x=XPos.LMARGIN, new_y=YPos.NEXT, align='L')
self.cell(0, 10, 'Page ' + str(self.page_no()) + '/{nb}',
new_x=XPos.RIGHT, new_y=YPos.TOP, align='C')
# AUDIO SETUP
FAIL_SOUND = 'fail.mp3'
PASS_SOUND = 'pass.mp3'
PLAY_SOUND = True
pygame.init()
pygame.mixer.init()
'''Loading and playing sounds in order to load dependencies of library'''
pygame.mixer.music.load(FAIL_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
pygame.mixer.music.load(PASS_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
def get_ip():
ip = subprocess.run(['hostname', '-I'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return ip
def terminate_script():
window.close()
sys.exit("Terminated at admin request")
def reboot_system():
system('sudo reboot')
def shutdown_system():
system('sudo shutdown -h now')
def set_bias_mux_to_low_range():
log_print("set_bias_mux_to_low_range() called")
GPIO.output(A_BIAS_MUX, GPIO.LOW)
log_print("set_bias_mux_to_low_range() returning")
def set_bias_mux_to_hi_range():
log_print("set_bias_mux_to_hi_range() called")
GPIO.output(A_BIAS_MUX, GPIO.HIGH)
log_print("set_bias_mux_to_hi_range() returning")
def set_dut_mux_to_input_res():
log_print("set_dut_mux_to_input_res() called")
GPIO.output(A_DUT_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
log_print("set_dut_mux_to_input_res() returning")
def set_dut_mux_to_output_res():
log_print("set_dut_mux_to_output_res() called")
GPIO.output(A_DUT_MUX, GPIO.HIGH)
GPIO.output(B_DUT_MUX, GPIO.LOW)
log_print("set_dut_mux_to_output_res() returning")
def reset_mouse_position():
# log_print("reset_mouse_position() called")
pyautogui.moveTo(700, 160)
# log_print("reset_mouse_position() returning")
def no_blank_screen():
log_print("no_blank_screen() called")
cmd_list = ['xset s noblank', 'xset -dpms', 'xset s off']
for command in cmd_list:
system(command)
log_print("no_blank_screen() returning")
def audio_feedback(local_result):
log_print("audio_feedback(%s) called" % local_result)
global PLAY_SOUND
if local_result == 'FAIL':
pygame.mixer.music.load(FAIL_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
elif local_result == 'PASS':
pygame.mixer.music.load(PASS_SOUND)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
else:
print('result is invalid. Cant play audio')
if PLAY_SOUND:
PLAY_SOUND = False
log_print("audio_feedback(%s) returning" % local_result)
def gui_frame_msngr_update(frame_to_show,
new_current_process_message='No message'):
user_messager_window.update(new_current_process_message)
current_frame_visibility = [frame1.visible, frame2.visible, frame3.visible,
frame4.visible, frame5.visible,
frame6.visible]
frame_update = [False, True, False, False, False, False, False, False,
False]
if frame_to_show == 1:
frame_update = [True, False, False, False, False, False, False, False,
False]
elif frame_to_show == 2:
frame_update = [False, True, False, False, False, False, False, False,
False]
elif frame_to_show == 3:
reset_mouse_position()
frame_update = [False, False, True, False, False, False, False, False,
False]
keypad_message_box.update(new_current_process_message)
elif frame_to_show == 4:
frame_update = [False, False, False, True, False, False, False, False,
False]
pass_test_text_box.update(new_current_process_message)
elif frame_to_show == 5:
frame_update = [False, False, False, False, True, False, False, False,
False]
fail_test_text_box.update(new_current_process_message)
elif frame_to_show == 6:
frame_update = [False, False, False, False, False, True, False, False,
False]
elif frame_to_show == 7:
frame_update = [False, False, False, False, False, False, True, False,
False]
elif frame_to_show == 8:
frame_update = [False, False, False, False, False, False, False, True,
False]
frame8_message_box.update(new_current_process_message)
elif frame_to_show == 9:
frame_update = [False, False, False, False, False, False, False, False,
True]
frame9_ip_add_box.update(new_current_process_message)
if not (current_frame_visibility == frame_update):
frame1.update(visible=frame_update[0])
frame2.update(visible=frame_update[1])
frame3.update(visible=frame_update[2])
frame4.update(visible=frame_update[3])
frame5.update(visible=frame_update[4])
frame6.update(visible=frame_update[5])
frame7.update(visible=frame_update[6])
frame8.update(visible=frame_update[7])
frame9.update(visible=frame_update[8])
window.refresh()
def show_calcheck_results(measurements, button_text, cal_reset):
log_print("show_calcheck_results([%.3f,%.3f], %s)" % (
measurements[0], measurements[1], button_text))
if button_text == 'CHECK LOW RANGE':
add_string = 'INSERT HIGH-RANGE\n' \
'RESISTANCE FIXTURE AND PRESS\n' \
'\'CHECK HIGH RANGE\''
if cal_reset:
res_message = 'UPPER-END RESISTANCE: %.3f\n' \
'LOWER-END RESISTANCE: %.3f\n%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
else:
res_message = 'INPUT RESISTANCE: %.3f\n' \
'OUTPUT RESISTANCE: %.3f\n%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
elif button_text == 'CHECK HIGH RANGE':
add_string = 'PRESS \'APPROVE & EXIT\'\n' \
'OR \'REDO CAL REF\'\n' \
'IF OUT OF TOLERANCE'
if cal_reset:
res_message = 'UPPER-END RESISTANCE: %.3f\n' \
'LOWER-END RESISTANCE: %.3f\n%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
else:
res_message = 'INPUT RESISTANCE: %.3f\n' \
'OUTPUT RESISTANCE: %.3f\n' \
'%s' % \
(round(measurements[0], 3),
round(measurements[1], 3),
add_string)
else:
res_message = 'ERROR IN show_calcheck_results\nCONTACT ENGINEERING'
log_print("show_calcheck_results() returning")
return res_message
def show_results(cath_test_result, barcode):
log_print("show_results(%s, %s) called" % (cath_test_result, barcode))
if SHOW_LAST_CATH_GUI_MSG:
message = 'JOB FINISHED\nUNPLUG CATHETER\nTO PRINT REPORT\n%s: %s' % (
barcode, cath_test_result)
else:
if REPEATED_CATHETER_DETECTED:
message = 'REPEATED CATHETER\nREADY FOR\nNEXT CATHETER\n%s: %s' % (
barcode, cath_test_result)
else:
message = 'READY FOR\nNEXT CATHETER\n%s: %s' % (
barcode, cath_test_result)
log_print("show_results() returning")
return message
def alrt_rdy_func_enable():
log_print("alrt_rdy_func_enable() called")
i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_LOTHRESH_ADDR,
LOTHRESH_CONFIGURATION_DATA)
i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_HITHRESH_ADDR,
HITHRESH_CONFIGURATION_DATA)
log_print("alrt_rdy_func_enable() returning")
def configure_adc(config_reg_data):
i2cbus.write_i2c_block_data(I2C_DEV_ADDR, REG_CONFIG_ADDR, config_reg_data)
def isr_enable():
log_print("isr_enable() called")
GPIO.add_event_detect(ADS1115_ALRT_RDY_SIG, GPIO.FALLING)
GPIO.add_event_callback(ADS1115_ALRT_RDY_SIG, read_adc)
log_print("isr_enable() returning")
def isr_disable():
log_print("isr_disable() called")
# GPIO.remove_event_detect(ADS1115_ALRT_RDY_SIG)
log_print("isr_disable() returning")
def adc_decode(adc_codes):
msb = adc_codes[0] << 8
lsb = adc_codes[1]
adc_code = msb | lsb
voltage = adc_code * LSB
return voltage
def volt2res(voltage):
log_print("volt2res(%f) called" % voltage)
high_range_correction_factor = \
(initial_m_high_range * voltage) + initial_b_high_range
low_range_correction_factor = \
(initial_m_low_range * voltage) + initial_b_low_range
dynamic_high_range_correction_factor = \
(dynamic_m_high_range * voltage) + dynamic_b_high_range
dynamic_low_range_correction_factor = \
(dynamic_m_low_range * voltage) + dynamic_b_low_range
if MEASURE_OUTPUT_RESISTANCE:
if CATH_MODEL is MODEL_P16x_SEL:
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
tfco = TFCO_LOW
elif CATH_MODEL is MODEL_P330_SEL:
correction_factor = high_range_correction_factor
dynamic_correction_factor = dynamic_high_range_correction_factor
tfco = TFCO_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
correction_factor = high_range_correction_factor
dynamic_correction_factor = dynamic_high_range_correction_factor
tfco = TFCO_HIGH
else:
tfco = TFCO_LOW
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
print("Something went wrong when selecting the correction "
"factor during output resistance "
"measurement. CATH_MODEL = ", CATH_MODEL)
else:
if CATH_MODEL is MODEL_P16x_SEL:
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
tfco = TFCO_LOW
elif CATH_MODEL is MODEL_P330_SEL:
correction_factor = high_range_correction_factor
dynamic_correction_factor = dynamic_high_range_correction_factor
tfco = TFCO_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
tfco = TFCO_LOW
else:
tfco = TFCO_LOW
correction_factor = low_range_correction_factor
dynamic_correction_factor = dynamic_low_range_correction_factor
print("Something went wrong when selecting the correction factor "
"during input resistance measurement. "
"CATH_MODEL = ", CATH_MODEL)
transfer_func = RGAIN1 * ((10 * ((voltage - V_BIAS) * tfco + V_BIAS)) - 1)
r_dut = transfer_func - correction_factor - dynamic_correction_factor
log_print("volt2res() returning")
return r_dut
def configure_measurement():
global MODEL_RES, MODEL_TOL, V_BIAS, RANGE_LIMIT_LOW, RANGE_LIMIT_HIGH
log_print("configure_measurement() called")
if MEASURE_OUTPUT_RESISTANCE:
set_dut_mux_to_output_res()
if CATH_MODEL is MODEL_P16x_SEL:
set_bias_mux_to_low_range()
MODEL_RES = MODEL_P16x_OUTPUT_RES
MODEL_TOL = MODEL_P16x_TOL
V_BIAS = V_BIAS_LOW
RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330_SEL:
set_bias_mux_to_hi_range()
MODEL_RES = MODEL_P330_OUTPUT_RES
MODEL_TOL = MODEL_P330_OUTPUT_TOL
V_BIAS = V_BIAS_HIGH
RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
set_bias_mux_to_hi_range()
MODEL_RES = MODEL_P330B_OUTPUT_RES
MODEL_TOL = MODEL_P330B_OUTPUT_TOL
V_BIAS = V_BIAS_HIGH
RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
else:
log_print("InvalidModelError:model index "
"value is invalid (model=%d)" % CATH_MODEL)
else:
set_dut_mux_to_input_res()
if CATH_MODEL is MODEL_P16x_SEL:
set_bias_mux_to_low_range()
MODEL_RES = MODEL_P16x_INPUT_RES
MODEL_TOL = MODEL_P16x_TOL
V_BIAS = V_BIAS_LOW
RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330_SEL:
set_bias_mux_to_hi_range()
MODEL_RES = MODEL_P330_INPUT_RES
MODEL_TOL = MODEL_P330_INPUT_TOL
V_BIAS = V_BIAS_HIGH
RANGE_LIMIT_LOW = HIGHRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = HIGHRANGE_LIMIT_HIGH
elif CATH_MODEL is MODEL_P330B_SEL:
set_bias_mux_to_low_range()
MODEL_RES = MODEL_P330B_INPUT_RES
MODEL_TOL = MODEL_P330B_INPUT_TOL
V_BIAS = V_BIAS_LOW
RANGE_LIMIT_LOW = LOWRANGE_LIMIT_LOW
RANGE_LIMIT_HIGH = LOWRANGE_LIMIT_HIGH
else:
log_print("InvalidModelError:model index "
"value is invalid (model=%d)" % CATH_MODEL)
log_print("configure_measurement() returning")
def jobsize_capture(jobsize_string):
global JOB_SIZE
log_print("jobsize_capture(%s) called" % jobsize_string)
if len(jobsize_string) > 0:
JOB_SIZE = int(jobsize_string)
else:
log_print("NO VALUE ENTERED.")
log_print("jobsize_capture() returning")
return False
if 0 < JOB_SIZE < MAX_CATHETERS_PER_JOB:
log_print("jobsize_capture() returning")
return True
else:
log_print("INVALID JOB SIZE.")
log_print("jobsize_capture() returning")
return False
def validate_job_number_barcode_scan(x):
log_print("validate_job_number_barcode_scan(%s) called" % x.strip())
try:
if len(x) > 0 and (int(x.strip()) > 999 and int(
x.strip()) < 100000000):
code_validity = True
else:
code_validity = False
except ValueError:
log_print("Invalid Job Number")
code_validity = False
return code_validity
log_print("validate_job_number_barcode_scan() returning")
return code_validity
def jobnumber_capture():
global JOB_NUMBER
log_print("jobnumber_capture() called")
for attempts in range(3):
JOB_NUMBER, jobnumber_validity = barcode_scanner()
if len(JOB_NUMBER) > 1:
if validate_job_number_barcode_scan(JOB_NUMBER):
log_print("jobnumber_capture() returning")
return True
else:
log_print("jobnumber_capture() returning")
return False
if not (bool(JOB_NUMBER)):
log_print("jobnumber_capture() returning")
return False
def model_capture():
global CATH_MODEL
log_print("model_capture() called")
if MODEL_SELECTED == 'P16x':
CATH_MODEL = MODEL_P16x_SEL
elif MODEL_SELECTED == 'P330':
CATH_MODEL = MODEL_P330_SEL
elif MODEL_SELECTED == 'P330B':
CATH_MODEL = MODEL_P330B_SEL
else:
log_print("INVALID MODEL SELECTED!")
log_print("model_capture() returning")
def waiting_for_manual_barcode():
global KEYPAD_CATH_BARCODE, MANUAL_BARCODE_CAPTURED
log_print("waiting_for_manual_barcode() called")
while not MANUAL_BARCODE_CAPTURED:
pass
sleep(.1)
MANUAL_BARCODE_CAPTURED = False
local_barcode = KEYPAD_CATH_BARCODE
log_print("waiting_for_manual_barcode() returning")
return local_barcode
def validate_catheter_barcode_scan(x):
log_print("validate_catheter_barcode_scan(%s) called" % x)
try:
if 5 < len(x) < 10:
code_validity = True
else:
code_validity = False
except TypeError:
code_validity = False
log_print("validate_catheter_barcode_scan() returning")
return code_validity
def manual_catheter_barcode_entry():
log_print("manual_catheter_barcode_entry() called")
keypad_back_btn.update('')
gui_frame_msngr_update(3, CATH_KEYPAD_MESSAGE)
while True:
event, values = window.read()
if event in '1234567890':
keys_entered = values['input'] # get what's been entered so far
keys_entered += event # add the new digit
window['input'].update(keys_entered)
elif event == 'Submit':
keys_entered = values['input']
window['input'].update('')
if validate_catheter_barcode_scan(keys_entered):
cath_barcode = keys_entered
break
else:
window['keypad_message'].update(CATH_KEYPAD_INVALID_MESSAGE)
keys_entered = ''
window['input'].update(keys_entered)
elif event == 'Clear': # clear keys if clear button
keys_entered = ''
window['input'].update(keys_entered)
log_print("manual_catheter_barcode_entry() returning")
return cath_barcode
def barcode_scanner():
log_print("barcode_scanner() called")
ser.write(SCNR_TRGR_CMD_BYTES) # Write Hex command to trigger barcode read
x = ser.read_until(
b'\r') # \r is the last character returned by the barcode reader after
# a barcode has been read. This character may change if
# the scanner model changes.
x = x.decode().split("31",
1) # anything to the right of the
# first 31 is the barcode.
# '31' is the last 2-digit code returned by the reader.
# This 2-digit code may have to change (or be removed)
# if the scanner model changes.
code_validity = validate_catheter_barcode_scan(x[1])
log_print("barcode_scanner() returning")
return x[1], code_validity
def print_report(local_report_filename):
log_print("print_report(%s) called" % local_report_filename)
cmd = 'sudo lp ' + local_report_filename
system(cmd)
log_print("print_report() returning")
def sort_data():
log_print("sort_data() called")
keys = list(cath_data_dict_unsorted_buffer.keys())
keys.sort()
catheter_data_dict_sorted = {key: cath_data_dict_unsorted_buffer[key]
for key in keys}
log_print("sort_data() returning")
return catheter_data_dict_sorted
def correction_fctr_calc(xlist, ylist):
log_print("correction_factor_calculation() called")
delta_x_low_range = xlist[0] - xlist[1]
delta_y_low_range = (ylist[0] - CAL_REF_RESISTANCE_LOWRANGE_HIGH) - (
ylist[1] - CAL_REF_RESISTANCE_LOWRANGE_LOW)
m_low_range = delta_y_low_range / delta_x_low_range
b_low_range = \
(ylist[0] - CAL_REF_RESISTANCE_LOWRANGE_HIGH) - m_low_range * xlist[0]
delta_x_high_range = xlist[2] - xlist[3]
delta_y_high_range = (ylist[2] - CAL_REF_RESISTANCE_HIRANGE_HIGH) - (
ylist[3] - CAL_REF_RESISTANCE_HIRANGE_LOW)
m_high_range = delta_y_high_range / delta_x_high_range
b_high_range = \
(ylist[2] - CAL_REF_RESISTANCE_HIRANGE_HIGH) - m_high_range * xlist[2]
log_print("correction_factor_calculation() returning")
return m_low_range, b_low_range, m_high_range, b_high_range
def correction_value(m, b, x):
log_print("correction_value() called")
corr_val = (m * x) + b
log_print("correction_value() returning")
return corr_val
def calibration():
global CAL_PASSED, V_BIAS, CATH_MODEL, CONVERSION_READY, \
ADC_SAMPLE_LATEST, CAL_REF_RESISTANCE_LOWRANGE_LOW, \
CAL_REF_RESISTANCE_LOWRANGE_HIGH, CAL_REF_RESISTANCE_HIRANGE_LOW, \
CAL_REF_RESISTANCE_HIRANGE_HIGH, \
dynamic_m_low_range, dynamic_b_low_range, dynamic_m_high_range, \
dynamic_b_high_range, CAL_FAIL
log_print("calibration() called")
# isr_enable()
temp_cath_model = CATH_MODEL
A = [0, 0]
x_list = []
y_list = []
SAMPLES_TO_TAKE = 120
samples_to_remove_cal = int(SAMPLES_TO_TAKE / 2)
sample_period = 0.004
CAL_FAIL = False
GPIO.output(B_DUT_MUX, GPIO.HIGH)
cal_resistances = [CAL_REF_RESISTANCE_LOWRANGE_HIGH,
CAL_REF_RESISTANCE_LOWRANGE_LOW,
CAL_REF_RESISTANCE_HIRANGE_HIGH,
CAL_REF_RESISTANCE_HIRANGE_LOW]
v_bias_cal = []
recalculate_dynamic_coefficients = False
log_print('\n\n===INITIATING CALIBRATION===\n')
for i in range(4):
voltage_samples = []
vbias_sample_buffer_cal = []
GPIO.output(A_DUT_MUX, A[1])
GPIO.output(CAL_RES_HI_RANGE_MUX, A[0])
GPIO.output(A_BIAS_MUX, A[0])
for sample in range(SAMPLES_TO_TAKE):
configure_adc(SINGLESHOT_VBIAS_CONFIG_REG)
while not CONVERSION_READY:
sleep(sample_period)
CONVERSION_READY = False
vbias_sample_buffer_cal.append(adc_decode(ADC_SAMPLE_LATEST))
vbias_sample_buffer_cal = vbias_sample_buffer_cal[
samples_to_remove_cal:]
V_BIAS = sum(vbias_sample_buffer_cal) / len(vbias_sample_buffer_cal)
log_print("V_BIAS = %f" % V_BIAS)
v_bias_cal.append(V_BIAS)
CATH_MODEL = A[0]
sleep(.35) # to allow the switches to settle
log_print('SAMPLING INTERNAL RESISTANCE...')
for sample in range(SAMPLES_TO_TAKE):
configure_adc(SINGLESHOT_CONFIG_REG)
while not CONVERSION_READY:
sleep(sample_period)
CONVERSION_READY = False
voltage_samples.append(adc_decode(ADC_SAMPLE_LATEST))
voltage_samples = voltage_samples[samples_to_remove_cal:]
avg_voltage = round(sum(voltage_samples) / len(voltage_samples), 3)
x_list.append(avg_voltage)
cal_res_msrmnt = round(volt2res(avg_voltage), 3)
y_list.append(cal_res_msrmnt)
if (abs(cal_res_msrmnt - cal_resistances[i]) /
cal_resistances[i]) > CAL_REF_RESISTANCE_TOL:
recalculate_dynamic_coefficients = True
log_print('avg_voltage:%.3f\navg_res:%.3f' % (
avg_voltage, cal_res_msrmnt))
for j in range(len(A) - 1, -1, -1):
if A[j] == 0:
A[j] = 1
break
A[j] = 0
if recalculate_dynamic_coefficients:
dynamic_m_low_range, dynamic_b_low_range, \
dynamic_m_high_range, dynamic_b_high_range = \
correction_fctr_calc(x_list, y_list)
log_print('dynamic_m_low_range, dynamic_b_low_range, '
'dynamic_m_high_range, dynamic_b_high_range:\n%f %f %f %f' %
(dynamic_m_low_range, dynamic_b_low_range,
dynamic_m_high_range, dynamic_b_high_range))
log_print("DYNAMIC CORRECTION FACTOR CALCULATED!")
# This condition limits how much the system can correct itself
# from the initial correction factors before triggering
# an engineer to recalibrate the system.
if (abs(dynamic_m_low_range)) > 10 or (
abs(dynamic_m_low_range) > 10) or (
abs(dynamic_m_low_range) > 10) or (
abs(dynamic_m_low_range) > 10):
CAL_FAIL = True
A = [0, 0]
for i in range(4):
voltage_samples = []
GPIO.output(A_DUT_MUX, A[1])
GPIO.output(CAL_RES_HI_RANGE_MUX, A[0])
GPIO.output(A_BIAS_MUX, A[0])
V_BIAS = v_bias_cal[i]
CATH_MODEL = A[0]
log_print('SAMPLING INTERNAL RESISTANCES '
'WITH DYNAMIC CORRECTION FACTOR...')
for sample in range(SAMPLES_TO_TAKE):
configure_adc(SINGLESHOT_CONFIG_REG)
while not CONVERSION_READY:
sleep(sample_period)
CONVERSION_READY = False
voltage_samples.append(adc_decode(ADC_SAMPLE_LATEST))
voltage_samples = voltage_samples[samples_to_remove_cal:]
avg_voltage = round(sum(voltage_samples) / len(voltage_samples), 3)
error_magnitude_voltage = round(
max(voltage_samples) - min(voltage_samples), 3)
error_plus_minus_voltage = round(error_magnitude_voltage / 2, 3)
cal_res_msrmnt = round(volt2res(avg_voltage), 3)
error_magnitude_res = round(
volt2res(max(voltage_samples)) - volt2res(
min(voltage_samples)), 3)
error_plus_minus_res = round(error_magnitude_res / 2, 3)
cal_resistance_tolerance = round(
cal_resistances[i] * CAL_REF_RESISTANCE_TOL, 3)
log_print('avg_voltage:%.3f +/-%.3f\navg_res:%.3f +/-%.3f\n' % (
avg_voltage, error_plus_minus_voltage,
cal_res_msrmnt, error_plus_minus_res))
if (abs(cal_res_msrmnt - cal_resistances[i]) / cal_resistances[i])\
< CAL_REF_RESISTANCE_TOL:
log_print("Measured calibrated resistance passed\n")
else:
CAL_FAIL = True
log_print("Measured calibrated resistance out of tolerance\n")
log_print(
'Expected Ω: %.3fΩ\nCalculated Ω: %fΩ\n' % (
cal_resistances[i], cal_res_msrmnt))
log_print(
'resistance tolerance: %.3f\n' % cal_resistance_tolerance)
log_print('resistance error measured: %.3f\n\n' % (
cal_res_msrmnt - cal_resistances[i]))
if i == 3 and CAL_FAIL is False:
CAL_PASSED = True
GPIO.output(A_DUT_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
CATH_MODEL = temp_cath_model
print('===CALIBRATION DONE===')
for j in range(len(A) - 1, -1, -1):
if A[j] == 0:
A[j] = 1
break
A[j] = 0
else:
GPIO.output(A_DUT_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
CATH_MODEL = temp_cath_model
CAL_PASSED = True
log_print('Dynamic coefficients were not recalculated. '
'Calibration is still stable.')
def read_adc(gpionum):
global CATH_CONN_EMPTY, RES_SAMPLES_VALS, ACTIVE_SAMPLING, \
CATH_RES_SAMPLES_COLLECTED, TEST_FINISHED, CATH_MODEL, \
ADC_SAMPLE_LATEST, CATH_DETECTED_SAMPLES_COLLECTED, \
CATH_DISCONN_SAMPLES_COLLECTED, end_job, SHOW_LAST_CATH_GUI_MSG, \
CONVERSION_READY, MEASURE_VBIAS, VBIAS_SAMPLES_BUFFER, V_BIAS, \
report_filename
ADC_SAMPLE_LATEST = i2cbus.read_i2c_block_data(I2C_DEV_ADDR,
REG_CONVERSION_ADDR, 2)
CONVERSION_READY = True
avg_voltage = adc_decode(ADC_SAMPLE_LATEST)
if CATH_CONN_EMPTY and CAL_PASSED and (
not ACTIVE_SAMPLING) and not EXTERNAL_CAL_INPROGRESS:
if CATH_DETECTED_SAMPLES_COLLECTED < CATH_DETECTED_SAMPLES_REQUIRED:
if avg_voltage < CATH_DETECTED_VOLTAGE:
CATH_DETECTED_SAMPLES_COLLECTED += 1
else:
CATH_DETECTED_SAMPLES_COLLECTED = 0
else:
configure_measurement()
MEASURE_VBIAS = True
CATH_CONN_EMPTY = False
CATH_DETECTED_SAMPLES_COLLECTED = 0
log_print("CATHETER DETECTED. STARTING TEST...")
elif MEASURE_VBIAS:
if len(VBIAS_SAMPLES_BUFFER) < 100:
VBIAS_SAMPLES_BUFFER.append(avg_voltage)
configure_adc(SINGLESHOT_VBIAS_CONFIG_REG)
# print('collecting vbias samples:',avg_voltage)
else:
MEASURE_VBIAS = False
VBIAS_SAMPLES_BUFFER = VBIAS_SAMPLES_BUFFER[50:]
V_BIAS = sum(VBIAS_SAMPLES_BUFFER) / len(VBIAS_SAMPLES_BUFFER)
VBIAS_SAMPLES_BUFFER = []
ACTIVE_SAMPLING = True
log_print('VBIAS = %f' % V_BIAS)
configure_adc(CONTINUOUS_CONFIG_REG)
elif ACTIVE_SAMPLING:
if CATH_RES_SAMPLES_COLLECTED < SAMPLES_REQUIRED:
RES_SAMPLES_VALS.append(avg_voltage)
CATH_RES_SAMPLES_COLLECTED += 1
else:
log_print("Finished logging test samples")
ACTIVE_SAMPLING = False
catheter_test()
if TEST_FINISHED:
if MEASURE_OUTPUT_RESISTANCE:
TEST_FINISHED = False
MEASURE_VBIAS = True
CATH_RES_SAMPLES_COLLECTED = 0
RES_SAMPLES_VALS = []
configure_measurement()
log_print("Measuring output resistance now...")
else:
if CATH_DISCONN_SAMPLES_COLLECTED < CATH_DETECTED_SAMPLES_REQUIRED:
if avg_voltage > CATH_DETECTED_VOLTAGE:
CATH_DISCONN_SAMPLES_COLLECTED += 1
else:
CATH_DISCONN_SAMPLES_COLLECTED = 0
else:
log_print("CATHETER DISCONNECTED. WRITING REPORT...")
TEST_FINISHED = False
CATH_CONN_EMPTY = True
CATH_RES_SAMPLES_COLLECTED = 0
CATH_DISCONN_SAMPLES_COLLECTED = 0
RES_SAMPLES_VALS = []
if catheters_processed == JOB_SIZE:
report_filename = write_to_report(sort_data())
end_job = True
def catheter_test():
global TEST_FINISHED, RES_SAMPLES_VALS, MEASURE_OUTPUT_RESISTANCE, \
catheters_processed, cath_data_dict_unsorted_buffer, avg_res, \
GUI_CURRENT_BARCODE, GUI_CURRENT_CATH_RESULT, SHOW_LAST_CATH_GUI_MSG, \
REPEATED_CATHETER_DETECTED, MANUAL_CATH_BARCODE_CAPTURE, \
test_result_buffer
log_print("catheter_test() called")
RES_SAMPLES_VALS = RES_SAMPLES_VALS[SAMPLES_TO_REMOVE:]
avg_voltage = sum(RES_SAMPLES_VALS) / len(RES_SAMPLES_VALS)
unrepeated_cath = False
repeated_catheter = True
if MEASURE_OUTPUT_RESISTANCE:
avg_res[1] = round(volt2res(avg_voltage), 2)
avg_res_dut = avg_res[1]
if abs(avg_res_dut - MODEL_RES) < MODEL_TOL:
test_result_buffer[1] = "PASS"
else:
test_result_buffer[1] = "FAIL"
log_print("Test Result:%s" % test_result_buffer[1])
else:
avg_res[0] = round(volt2res(avg_voltage), 2)
avg_res_dut = avg_res[0]
if abs(avg_res_dut - MODEL_RES) < MODEL_TOL:
test_result_buffer[0] = "PASS"
else:
test_result_buffer[0] = "FAIL"
log_print("Test Result:%s" % test_result_buffer[0])
log_print("Average resistance read:%f" % avg_res_dut)
if MEASURE_OUTPUT_RESISTANCE:
barcode, code_val = barcode_scanner()
if test_result_buffer[0] == "FAIL" or test_result_buffer[1] == "FAIL":
test_result = "FAIL"
else:
test_result = "PASS"
if bool(barcode) is False:
# Trigger main thread to change
# GUI to capture catheter SN manually.
MANUAL_CATH_BARCODE_CAPTURE = True
# This loop simply waits for the
# main thread to do the GUI operation.
# Reason is, Tkinter does not like working in multiple threads.
barcode = waiting_for_manual_barcode()
code_val = validate_catheter_barcode_scan(barcode)
if barcode.strip() in cath_data_dict_unsorted_buffer:
current_catheter_data = cath_data_dict_unsorted_buffer[
barcode.strip()]
current_catheter_data[4] = repeated_catheter
REPEATED_CATHETER_DETECTED = True
log_print("REPEATED CATHETER, UPDATING REPORT FOR %s" % barcode)
else:
if (avg_res[0] > RANGE_LIMIT_HIGH) or (
avg_res[0] < RANGE_LIMIT_LOW):
avg_res[0] = 9999
if (avg_res[1] > RANGE_LIMIT_HIGH) or (
avg_res[1] < RANGE_LIMIT_LOW):
avg_res[1] = 9999
cath_data_dict_unsorted_buffer[barcode.strip()] = [test_result,
avg_res[0],
avg_res[1],
code_val,
unrepeated_cath]
catheters_processed += 1
if catheters_processed < JOB_SIZE:
log_print("\n\n---READY FOR NEXT CATHETER---\n\n")
elif catheters_processed == JOB_SIZE:
log_print("LAST CATHETER OF JOB. UNPLUG TO PRINT REPORT.")
SHOW_LAST_CATH_GUI_MSG = True
GUI_CURRENT_BARCODE = barcode.strip()
GUI_CURRENT_CATH_RESULT = test_result
MEASURE_OUTPUT_RESISTANCE = False
else:
MEASURE_OUTPUT_RESISTANCE = True
TEST_FINISHED = True
def loop():
global catheters_processed, CATH_MODEL, \
JOB_SIZE, JOB_NUMBER, CAL_PASSED, \
end_job, cath_data_dict_unsorted_buffer, \
MODEL_SELECTED, CURRENT_PROCESS_MESSAGE, \
SHOW_LAST_CATH_GUI_MSG, REPEATED_CATHETER_DETECTED, \
KEYPAD_CATH_BARCODE, MANUAL_BARCODE_CAPTURED, \
MANUAL_CATH_BARCODE_CAPTURE, CATH_RES_SAMPLES_COLLECTED, \
PLAY_SOUND, PROCESS_MESSENGER_FONT, EXTERNAL_CAL_INPROGRESS, \
EXTERNAL_CAL_MEASURED_RES_VALS, EXTERNAL_CAL_USER_ENTERED_RES_VALS, \
dynamic_m_low_range, dynamic_b_low_range, dynamic_m_high_range, \
dynamic_b_high_range, initial_b_low_range, initial_m_low_range, \
initial_b_high_range, initial_m_high_range, \
CAL_REF_RESISTANCE_LOWRANGE_LOW, CAL_REF_RESISTANCE_LOWRANGE_HIGH, \
CAL_REF_RESISTANCE_HIRANGE_LOW, \
CAL_REF_RESISTANCE_HIRANGE_HIGH, CAL_FAIL
first_iteration = True
jobsize_valid = False
jobnumber_valid = False
reset_mouse_position()
first_job_done = False
res_measurements = []
display_cal_check_measurements = False
x_list = [[], []]
cal_res_fixture_count = 0
admin_logon_attempt = False
notify_user_cath_detected = True
notify_user_test_result = True
cath_data_dict_unsorted_buffer = {}
while True:
while True:
event, values = window.read()
f8_btn1_txt = frame8_button1_text.get_text().strip()
f8_btn2_txt = frame8_button2_text.get_text().strip()
if event in (sg.WINDOW_CLOSED, 'Exit'):
break
elif event == ADMIN_FUNCS_KEY:
log_print("USER PULLED UP ADMIN PASSWORD REQUEST PAGE")
gui_frame_msngr_update(3, ENTER_PW_MESSAGE)
admin_logon_attempt = True
elif event == 'RE-PRINT':
log_print('=USER ATTEMPTED TO RE-PRINT=')
if first_job_done:
reset_mouse_position()
print_report(report_filename)
else:
frame8_button1_text.update('BACK')
frame8_button2_text.update('')
gui_frame_msngr_update(8, RUN_FIRST_JOB_FIRST)
elif event == "TERMINATE SCRIPT":
terminate_script()
elif event == "REBOOT":
reboot_system()
elif event == "SHUTDOWN":
shutdown_system()
elif event == 'CALIBRATE':
CAL_FAIL = False
log_print("=USER PRESSED 'CALIBRATE'=")
reset_mouse_position()
GPIO.output(B_DUT_MUX, GPIO.LOW)
EXTERNAL_CAL_INPROGRESS = True
frame8_button1_text.update('CHECK LOW RANGE')
frame8_button2_text.update('BACK')
log_print("=INSTRUCTING USER TO MEASURE AND "
"INSERT CALIBRATION RESISTANCES=")
gui_frame_msngr_update(8, CAL_INSERT_FIXTURE_MESSAGE)
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'CHECK LOW RANGE' and \
not display_cal_check_measurements:
reset_mouse_position()
log_print("=USER PRESSED 'CHECK LOW RANGE'=")
gui_frame_msngr_update(2, CHECK_CAL_MESSAGE)
res_measurements, temp_hold = measure_single_catheter('LOW')
window.write_event_value('display_measurements', '')
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'CHECK HIGH RANGE' and \
not display_cal_check_measurements:
log_print("=USER PRESSED 'CHECK HIGH RANGE'=")
gui_frame_msngr_update(2, CHECK_CAL_MESSAGE)
res_measurements, temp_hold = measure_single_catheter('HIGH')
window.write_event_value('display_measurements', '')
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'APPROVE & EXIT':
log_print("=USER PRESSED 'APPROVE & EXIT'=")
reset_mouse_position()
EXTERNAL_CAL_INPROGRESS = False
EXTERNAL_CAL_MEASURED_RES_VALS = [[], []]
EXTERNAL_CAL_USER_ENTERED_RES_VALS = [0, 0, 0, 0]
x_list = [[], []]
gui_frame_msngr_update(1)
elif event == BUTTON_MESSAGE_KEY2 and \
f8_btn2_txt == 'REDO CAL REF':
reset_mouse_position()
initial_b_low_range, initial_m_low_range = 0, 0
initial_b_high_range, initial_m_high_range = 0, 0
dynamic_m_low_range, dynamic_b_low_range = 0, 0
dynamic_m_high_range, dynamic_b_high_range = 0, 0
cal_res_count = 0
cal_res_fixture_count = 0
cal_keypad_messager_window.update(
CAL_KEYPAD_PROMPT_MESSAGES[cal_res_count])
gui_frame_msngr_update(7)
elif event == BUTTON_MESSAGE_KEY1 and \
f8_btn1_txt == 'CAPTURE REF RESISTANCES':
gui_frame_msngr_update(2, CAPTURE_CAL_RES_MESSAGE)
EXTERNAL_CAL_MEASURED_RES_VALS[cal_res_fixture_count], x_list[
cal_res_fixture_count] = \
measure_single_catheter(
'LOW' if cal_res_fixture_count == 0 else 'HIGH')
cal_res_fixture_count += 1
if cal_res_fixture_count > 1:
gui_frame_msngr_update(2, RECALCULATE_REF_RES_MESSAGE)
ext_cal_measured_vals_f = list(
chain.from_iterable(EXTERNAL_CAL_MEASURED_RES_VALS))
x_list_f = list(chain.from_iterable(x_list))
CAL_REF_RESISTANCE_LOWRANGE_HIGH = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[1]
CAL_REF_RESISTANCE_LOWRANGE_LOW = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[0]
CAL_REF_RESISTANCE_HIRANGE_HIGH = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[3]
CAL_REF_RESISTANCE_HIRANGE_LOW = \
EXTERNAL_CAL_USER_ENTERED_RES_VALS[2]
initial_m_low_range, initial_b_low_range, \
initial_m_high_range, initial_b_high_range = \
correction_fctr_calc(x_list_f, ext_cal_measured_vals_f)
GPIO.output(B_DUT_MUX, GPIO.HIGH)
sleep(.3)
low_range_recalculated_interal_resistances, temp_hold = \
measure_single_catheter('LOW')
GPIO.output(CAL_RES_HI_RANGE_MUX, GPIO.HIGH)
sleep(.3)
high_range_recalculated_interal_resistances, temp_hold = \
measure_single_catheter('HIGH')
print('%s\n%s\n%s\n%s\n' % (
initial_m_low_range, initial_b_low_range,
initial_m_high_range, initial_b_high_range))
print('%s\n%s\n%s\n%s\n' % (
low_range_recalculated_interal_resistances[1],
low_range_recalculated_interal_resistances[0],
high_range_recalculated_interal_resistances[1],
high_range_recalculated_interal_resistances[0]
))
with open('initial_correction_factor.txt', 'w') as ff:
ff.write('%s\n%s\n%s\n%s\n' % (
initial_m_low_range, initial_b_low_range,
initial_m_high_range, initial_b_high_range))
ff.write('%s\n%s\n%s\n%s\n' % (
low_range_recalculated_interal_resistances[1],
low_range_recalculated_interal_resistances[0],
high_range_recalculated_interal_resistances[1],
high_range_recalculated_interal_resistances[0]
))
CAL_REF_RESISTANCE_LOWRANGE_HIGH = \
low_range_recalculated_interal_resistances[0]
CAL_REF_RESISTANCE_LOWRANGE_LOW = \
low_range_recalculated_interal_resistances[1]
CAL_REF_RESISTANCE_HIRANGE_HIGH = \
high_range_recalculated_interal_resistances[0]
CAL_REF_RESISTANCE_HIRANGE_LOW = \
high_range_recalculated_interal_resistances[1]
GPIO.output(CAL_RES_HI_RANGE_MUX, GPIO.LOW)
GPIO.output(B_DUT_MUX, GPIO.LOW)
frame8_button1_text.update('CHECK LOW RANGE')
frame8_button2_text.update('')
res_measurements = []
reset_mouse_position()
gui_frame_msngr_update(8, CAL_RE_INSRT_LOW_RANGE_FIXTR_MSG)
else:
reset_mouse_position()
gui_frame_msngr_update(8, CAL_CURRENT_PROCESS_MESSAGES[
cal_res_fixture_count])
elif event == 'display_measurements':
log_print("Displaying calibration check values...")
reset_mouse_position()
if f8_btn1_txt == 'CHECK LOW RANGE':
frame8_button1_text.update('CHECK HIGH RANGE')
frame8_button2_text.update('')
elif f8_btn1_txt == 'CHECK HIGH RANGE':
frame8_button1_text.update('APPROVE & EXIT')
frame8_button2_text.update('REDO CAL REF')
gui_frame_msngr_update(8,
show_calcheck_results(
res_measurements, f8_btn1_txt,
cal_res_fixture_count))
display_cal_check_measurements = False
elif event == 'cal_Submit':
EXTERNAL_CAL_USER_ENTERED_RES_VALS[cal_res_count] = float(
values['calinput'])
window['calinput'].update('')
reset_mouse_position()
cal_res_count += 1
if cal_res_count > 3:
gui_frame_msngr_update(8, CAL_CURRENT_PROCESS_MESSAGES[
cal_res_fixture_count])
frame8_button1_text.update('CAPTURE REF RESISTANCES')
frame8_button2_text.update('')
else:
cal_keypad_messager_window.update(
CAL_KEYPAD_PROMPT_MESSAGES[cal_res_count])
elif event == 'cal_Clear':
log_print("=USER PRESSED 'CLEAR' in CAL KEYPAD")
reset_mouse_position()
cal_keys_entered = ''
window['calinput'].update(cal_keys_entered)
elif event == 'cal_BACK':
initial_m_low_range = float(lines[0].strip())
initial_b_low_range = float(lines[1].strip())
initial_m_high_range = float(lines[2].strip())
initial_b_high_range = float(lines[3].strip())
CAL_REF_RESISTANCE_LOWRANGE_LOW = float(lines[4].strip())
CAL_REF_RESISTANCE_LOWRANGE_HIGH = float(lines[5].strip())
CAL_REF_RESISTANCE_HIRANGE_LOW = float(lines[6].strip())
CAL_REF_RESISTANCE_HIRANGE_HIGH = float(lines[7].strip())
log_print("=USER PRESSED 'BACK' WHEN "
"IN THE REDO REF CAL KEYPAD=")
reset_mouse_position()
gui_frame_msngr_update(1)
cal_keys_entered = ''
window['calinput'].update(cal_keys_entered)
elif 'cal_' in event:
event_name = event.split('cal_')
reset_mouse_position()
cal_keys_entered = values[
'calinput'] # get what's been entered so far
cal_keys_entered += event_name[1] # add the new digit
window['calinput'].update(cal_keys_entered)
elif (event == BUTTON_MESSAGE_KEY1 and f8_btn1_txt == 'BACK') or (
event == BUTTON_MESSAGE_KEY2 and f8_btn2_txt == 'BACK'):
log_print("=USER PRESSED 'BACK' IN CALIBRATE=")
admin_logon_attempt = False
reset_mouse_position()
EXTERNAL_CAL_INPROGRESS = False
gui_frame_msngr_update(1)
elif event == KEYPAD_BACK_BTN_KEY:
if admin_logon_attempt:
admin_logon_attempt = False
log_print("=USER PRESSED 'BACK' ON "
"KEYPAD WHEN PROMPTED FOR ADMIN PW")
gui_frame_msngr_update(1)
if JOB_SIZE == 0:
log_print("=USER PRESSED 'BACK' ON KEYPAD WHEN PROMPTED TO"
" ENTER JOB SIZE OR WHEN IN "
"THE REDO REF CAL KEYPAD=")
gui_frame_msngr_update(1)
else:
log_print("=USER PRESSED 'BACK' ON KEYPAD WHEN "
"PROMPTED FOR JOB NUMBER=")
JOB_SIZE = 0
JOB_NUMBER = 0
window['keypad_message'].update(JOBSIZE_KEYPAD_MESSAGE)
keys_entered = ''
window['input'].update(keys_entered)
reset_mouse_position()
elif event == 'P16x' or event == 'P330' or event == 'P330B':
log_print("=USER SELECTED %s AS THE MODEL=" % event)
# window['keypad_message'].update(JOBSIZE_KEYPAD_MESSAGE)
MODEL_SELECTED = event
model_capture()
gui_frame_msngr_update(3, JOBSIZE_KEYPAD_MESSAGE)
elif event in '1234567890':
log_print("=USER PRESSED %s ON THE KEYPAD=" % event)
reset_mouse_position()
keys_entered = values[
'input'] # get what's been entered so far
keys_entered += event # add the new digit
window['input'].update(keys_entered)
elif event == 'Submit':
reset_mouse_position()
keys_entered = values['input']
window['input'].update('')
if admin_logon_attempt:
if keys_entered == ADMIN_PASSWORD:
gui_frame_msngr_update(9, get_ip())
else:
window['keypad_message'].update(INVALID_PW_MESSAGE)
else:
if JOB_SIZE == 0:
log_print("=USER PRESSED 'SUBMIT' TO RECORD JOB SIZE=")
jobsize_valid = jobsize_capture(keys_entered)
if jobsize_valid:
log_print("=USER WAS PROMPTED TO "
"SCAN JOB NUMBER BARCODE=")
gui_frame_msngr_update(6,
JOBNUMBER_SCAN_MESSAGE)
jobnumber_valid = jobnumber_capture()
if not jobnumber_valid:
log_print("USER WAS PROMPTED TO "
"ENTER JOB NUMBER MANUALLY")
keys_entered = ''
window['input'].update('')
gui_frame_msngr_update(3, JOBNUM_KEYPAD_MSG)
else:
window['keypad_message'].update(
JOBSIZE_INVALID_MESSAGE)
keys_entered = ''
JOB_SIZE = 0
if jobsize_valid and jobnumber_valid:
log_print(
"VALID JOB SIZE AND JOB NUMBERS WERE RECORDED")
break
elif not jobnumber_valid:
log_print(
"=USER PRESSED 'SUBMIT' TO RECORD JOB NUMBER=")
jobnumber_valid = validate_job_number_barcode_scan(
keys_entered)
if jobnumber_valid:
log_print(
"VALID JOB SIZE AND JOB NUMBERS WERE RECORDED")
JOB_NUMBER = keys_entered
break
else:
window['keypad_message'].update(
JOBNUMBER_INVALID_MESSAGE)
keys_entered = ''
window['input'].update(keys_entered)
elif event == 'Clear':
log_print("=USER PRESSED 'CLEAR'")
reset_mouse_position()
keys_entered = ''
window['input'].update(keys_entered)
while True:
if jobsize_valid and jobnumber_valid and \
not CAL_PASSED and not CAL_FAIL:
log_print(
"USER WAS NOTIFIED THAT CALIBRATION WAS BEING PERFORMED")
gui_frame_msngr_update(2, CAL_PERFORMING_MESSAGE)
calibration()
elif CAL_FAIL and not CAL_PASSED:
log_print("USER WAS NOTIFIED THAT CALIBRATION FAILED")
dynamic_m_low_range, dynamic_b_low_range, \
dynamic_m_high_range, dynamic_b_high_range = 0, 0, 0, 0
frame8_button1_text.update('')
frame8_button2_text.update('')
gui_frame_msngr_update(8, CAL_FAIL_MESSAGE)
sleep(10)
log_print("Calibration failed. "
"An engineer will come troubleshoot the system.")
SHOW_LAST_CATH_GUI_MSG = False
catheters_processed = 0
cath_data_dict_unsorted_buffer = {}
CATH_MODEL = -1
JOB_SIZE = 0
JOB_NUMBER = 0
CATH_RES_SAMPLES_COLLECTED = 0
CAL_PASSED = False
first_iteration = True
end_job = False
reset_mouse_position()
gui_frame_msngr_update(1)
notify_user_cath_detected = True
notify_user_test_result = True
break
elif CAL_PASSED and first_iteration:
# isr_enable()
configure_adc(CONTINUOUS_CONFIG_REG)
first_iteration = False
log_print("USER WAS PROMPTED TO INSERT "
"THE FIRST CATHETER OF THE JOB")
gui_frame_msngr_update(2, FIRST_CATH_MESSAGE)
elif not CATH_CONN_EMPTY and ACTIVE_SAMPLING and \
CATH_DETECTED_SAMPLES_COLLECTED == 0 and \
notify_user_cath_detected:
notify_user_cath_detected = False
notify_user_test_result = True
REPEATED_CATHETER_DETECTED = False # flag reset purposes only
PLAY_SOUND = True # flag reset purposes only
log_print("USER WAS NOTIFIED THAT THE CATHETER WAS DETECTED")
gui_frame_msngr_update(2, CATH_DETECTED_MESSAGE)
elif MANUAL_CATH_BARCODE_CAPTURE:
KEYPAD_CATH_BARCODE = manual_catheter_barcode_entry()
keypad_back_btn.update('BACK')
MANUAL_BARCODE_CAPTURED = True
MANUAL_CATH_BARCODE_CAPTURE = False
elif ((TEST_FINISHED and not MEASURE_OUTPUT_RESISTANCE and
not SHOW_LAST_CATH_GUI_MSG and not end_job) or
(catheters_processed == JOB_SIZE and
SHOW_LAST_CATH_GUI_MSG and
not end_job)) and notify_user_test_result:
notify_user_test_result = False
frame_to_see = 4 if GUI_CURRENT_CATH_RESULT == "PASS" else 5
log_print(
"USER WAS NOTIFIED THAT %s TEST RESULT IS: %s" % (
GUI_CURRENT_BARCODE, GUI_CURRENT_CATH_RESULT))
gui_frame_msngr_update(frame_to_see, show_results(
GUI_CURRENT_CATH_RESULT, GUI_CURRENT_BARCODE))
# resetting flag for next possible catheter
notify_user_cath_detected = True
if PLAY_SOUND:
audio_feedback(GUI_CURRENT_CATH_RESULT)
pass
elif catheters_processed == JOB_SIZE and end_job:
log_print("Job finished. Resetting flags and printing report.")
SHOW_LAST_CATH_GUI_MSG = False
catheters_processed = 0
cath_data_dict_unsorted_buffer = {}
CATH_MODEL = -1
JOB_SIZE = 0
JOB_NUMBER = 0
CATH_RES_SAMPLES_COLLECTED = 0
CAL_PASSED = False
first_iteration = True
end_job = False
reset_mouse_position()
gui_frame_msngr_update(1)
first_job_done = True
notify_user_cath_detected = True
notify_user_test_result = True
print_report(report_filename)
break
sleep(.05)
window.close()
alrt_rdy_func_enable()
isr_enable()
no_blank_screen()
loop()
sys.path.appendis suspicious and concerning. Why is it needed? \$\endgroup\$