I am working on a Linux environment, trying to automate these 2 Linux commands using a Python script:
- sudo rfcomm connect 0 XX:XX:XX:XX:XX:XX
- hcitool rssi XX:XX:XX:XX:XX:XX
Command 1 connects to my bluetooth device, and command 2 returns the received signal strength inidication (RSSI) as long as the device is connected. So the script must be able to connect and then extract RSSI without any human intervention. I am running these commands using the subprocess module.
The problem: When command 1 is run such that a connection is made, it says "Connected /dev/rfcomm0 to XX:XX:XX:XX:XX:XX on channel 1 Press CTRL-C for hangup", however it does not return. It only returns once the connection is lost, at which point it would be impossible to extract RSSI. Currently the python script does not continue to the next line of code after running command 1, so I am unable to run command 2 afterwards. Here is my code:
def get_rssi():
while True:
p1 = subprocess.run(['hcitool', 'rssi', shimmer_id], capture_output=True, text=True)
if p1.stderr:
print('stderr:', p1.stderr)
reconnect()
if p1.stdout:
print('stdout:', p1.stdout)
time.sleep(0.3)
def reconnect():
reconnected = False
print('[-] lost connection, reconnecting...')
while reconnected is False:
p2 = subprocess.run(['sudo', 'rfcomm', 'connect', '0', shimmer_id], capture_output=True, text=True)
# If the connection on the previous line is successful, nothing is executed from here onward! The script only continues once the above line returns (i.e. when the connection is lost)
if p2.stderr:
if 'Host is down' in p2.stderr.decode():
print('Host is down')
time.sleep(0.3)
reconnect()
else:
reconnected = True
get_rssi()
I have tried threading and multiprocessing, but they do not help since command 1 never "joins" in python threading lingo. I am aware that I probably need to run command 1 asynchronously, but I am struggling to implement it properly in python even while referring to the asyncio docs. Any help is much appreciated!
asyncio.create_subprocess_shell.