Going by the description of your problem, and taking some liberties from my side, the following is a "quick-n-dirty" solution that creates a ForceSensorInterface class that inherits from the rclpy.node.Node class.
Python code:
#!/usr/bin/env python3
import serial
import serial.rs485
import RPi.GPIO as GPIO
import struct
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64
from std_srvs.srv import Trigger
class ForceSensorInterface(Node):
def __init__(self, pub_rate:int = 50, address:int = 1):
super().__init__('force_sensor_interface')
self.readings_publisher_ = self.create_publisher(Float64, '/force_sensor/readings', 10)
self.pub_rate = pub_rate # Define your rate of publishing here
# set the address of the serial device here
# Might be of interest to set this as a ROS parameter
from rcl_interfaces.msg import ParameterDescriptor # useful to have doc of what the parameters does
address_parameter_descriptor = ParameterDescriptor(description='Set the address of the device to read from serial port')
self.declare_parameter('address', address, address_parameter_descriptor)
# you can run ros2 param describe /force_sensor_interface address to see the type and description.
# This timer will handle publishing the sensor readings as per the specified
# publication rate:
self.timer = self.create_timer(1.0/self.pub_rate, self.read_tension)
# service routine to handle taring the sensor:
self.tare_srv = self.create_service(Trigger, '/force_sensor/tare', self.tare)
def read_tension(self): #read tension of the specified sensor
address = self.get_parameter('address').get_parameter_value()
if address ==1:
send = [0x01,0x03,0x00,0x00,0x00,0x02,0xC4,0x0B]
msg = Float64()
GPIO.output(TXDEN_1, GPIO.HIGH)
ser.write(serial.to_bytes(send))
GPIO.output(TXDEN_1, GPIO.LOW)
response = ser.read(ser.inWaiting())
hex_data= ([format(x, '02x') for x in response])
msg.data = struct.unpack('!f',bytes.fromhex(''.join(hex_data[3:7])))[0]
self.readings_publisher_.publish(msg)
def tare(self, request, response): #Tare the specified sensor
address = self.get_parameter('address').get_parameter_value()
if address ==1:
send = [0x01,0x10,0x1A,0x00,0x00,0x01,0x02,0x00, 0xBB, 0x5D, 0xE2]
self.get_logger().info('Received request to tare sensor')
GPIO.output(TXDEN_1, GPIO.HIGH) #write enabled for sending data frame to read the register
ser.write(serial.to_bytes(send))
GPIO.output(TXDEN_1, GPIO.LOW)
response = ser.read(ser.inWaiting())
hex_data= ''.join([format(x, '02x') for x in response])
if hex_data == tare_check:
print('tared no.', address)
# Set the success flag to true or false as per your logic/hardware structure
response.success = True
response.message = "sensor tare success!"
return response
def main(args=None):
rclpy.init(args=args)
force_sensor_interface = ForceSensorInterface()
rclpy.spin(force_sensor_interface)
force_sensor_interface.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Code explanation
The class initializes a publisher for sensor readings, I am not aware of how your particular sensor works but it made more sense to me for the sensor to continuously publish its readings over a topic named /force_sensor/readings. The __init__ constructor takes in a pub_rate parameter that allows you to set how frequently you get sensor readings; as it is tied to your hardware and you know it best!
Changing the device being read is implemented using a parameter named address that defaults to a parameter passed into the class' constructor named address as well.
Taring your sensor is done through a service call under the topic /force_sensor/tare.
The two main functions are basically the same samples you have provided but "ROS-ified".
In the main function, we simply initialize a node, create an object of the ForceSensorInterface class, and spin the node.
The code should act as a general pointer towards how to use publishers, services, and parameters to control your logic, and you should be able to change as per your requirements (e.g. changing the pressure readings to be a service call instead of a service call). The footnotes include a couple of tutorials that might be of help for you. Good luck and happy coding!
Resources: