1
$\begingroup$

I have a force sensor that I want to use with ROS2, however, I don't know how to create a ROS2 wrapper for it. All I found was this tutorial that creates a ROS1 wrapper. Below is a snippet of the sensor python code I wrote, that I want it to be interfaced with ROS2:

import serial 
import serial.rs485 
import RPi.GPIO as GPIO 
import struct

def read_tension(address): #read tension of the specified sensor
    if address ==1:
        send  = [0x01,0x03,0x00,0x00,0x00,0x02,0xC4,0x0B]

    GPIO.output(TXDEN_1, GPIO.HIGH)
    ser.write(serial.to_bytes(send))
    GPIO.output(TXDEN_1, GPIO.LOW)
  
    response = ser.read(ser.inWaiting())
    hex_data= ([format(x, '02x') for x in response])
    return struct.unpack('!f',bytes.fromhex(''.join(hex_data[3:7])))[0]
       
    
    
    
def tare(address): #Tare the specified sensor
    if address ==1:
        send  = [0x01,0x10,0x1A,0x00,0x00,0x01,0x02,0x00, 0xBB, 0x5D, 0xE2]

    GPIO.output(TXDEN_1, GPIO.HIGH) #write enabled for sending data frame to read the register
    ser.write(serial.to_bytes(send))
    GPIO.output(TXDEN_1, GPIO.LOW)

    response = ser.read(ser.inWaiting())
    hex_data= ''.join([format(x, '02x') for x in response])
    
    if hex_data == tare_check:
        print('tared no.', address)

I think using services would be the best for my case, correct?

$\endgroup$
2
  • 1
    $\begingroup$ What are you trying to achieve with the sensor? You said you though using services would be the best - why? $\endgroup$ Commented Apr 17, 2024 at 10:50
  • $\begingroup$ @Nikolai want to simply use the sensor with ROS2. The sensor uses CAN Bus (half duplex) and I have 4 sensors, so when I need to talk to any of them, I will send the commands (the hex lists) and hear back from that specific sensor. Due to request-response style of the sensor, I thought services would be the best $\endgroup$ Commented Apr 18, 2024 at 7:23

2 Answers 2

3
+50
$\begingroup$

Going by the description of your problem, and taking some liberties from my side, the following is a "quick-n-dirty" solution that creates a ForceSensorInterface class that inherits from the rclpy.node.Node class.

Python code:

#!/usr/bin/env python3

import serial 
import serial.rs485 
import RPi.GPIO as GPIO 
import struct

import rclpy
from rclpy.node import Node

from std_msgs.msg import Float64
from std_srvs.srv import Trigger


class ForceSensorInterface(Node):
    def __init__(self, pub_rate:int = 50, address:int = 1):
        super().__init__('force_sensor_interface')
        self.readings_publisher_ = self.create_publisher(Float64, '/force_sensor/readings', 10)
        self.pub_rate = pub_rate  # Define your rate of publishing here
        
        # set the address of the serial device here
        # Might be of interest to set this as a ROS parameter
        from rcl_interfaces.msg import ParameterDescriptor  # useful to have doc of what the parameters does
        
        address_parameter_descriptor = ParameterDescriptor(description='Set the address of the device to read from serial port')
        self.declare_parameter('address', address, address_parameter_descriptor)        
        # you can run ros2 param describe /force_sensor_interface address to see the type and description.
        
        # This timer will handle publishing the sensor readings as per the specified
        # publication rate:
        self.timer = self.create_timer(1.0/self.pub_rate, self.read_tension)
        
        # service routine to handle taring the sensor:
        self.tare_srv = self.create_service(Trigger, '/force_sensor/tare', self.tare)
    
    def read_tension(self): #read tension of the specified sensor
        address = self.get_parameter('address').get_parameter_value()
        if address ==1:
            send  = [0x01,0x03,0x00,0x00,0x00,0x02,0xC4,0x0B]
        
        msg = Float64()
        GPIO.output(TXDEN_1, GPIO.HIGH)
        ser.write(serial.to_bytes(send))
        GPIO.output(TXDEN_1, GPIO.LOW)

        response = ser.read(ser.inWaiting())
        hex_data= ([format(x, '02x') for x in response])
        msg.data = struct.unpack('!f',bytes.fromhex(''.join(hex_data[3:7])))[0]
        
        self.readings_publisher_.publish(msg)

    def tare(self, request, response): #Tare the specified sensor
        address = self.get_parameter('address').get_parameter_value()
        if address ==1:
            send  = [0x01,0x10,0x1A,0x00,0x00,0x01,0x02,0x00, 0xBB, 0x5D, 0xE2]

        self.get_logger().info('Received request to tare sensor')
        
        GPIO.output(TXDEN_1, GPIO.HIGH) #write enabled for sending data frame to read the register
        ser.write(serial.to_bytes(send))
        GPIO.output(TXDEN_1, GPIO.LOW)

        response = ser.read(ser.inWaiting())
        hex_data= ''.join([format(x, '02x') for x in response])
        
        if hex_data == tare_check:
            print('tared no.', address)
        
        # Set the success flag to true or false as per your logic/hardware structure
        response.success = True
        response.message = "sensor tare success!"
        
        return response


def main(args=None):
    rclpy.init(args=args)
    
    force_sensor_interface = ForceSensorInterface()
    
    rclpy.spin(force_sensor_interface)
    force_sensor_interface.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Code explanation

The class initializes a publisher for sensor readings, I am not aware of how your particular sensor works but it made more sense to me for the sensor to continuously publish its readings over a topic named /force_sensor/readings. The __init__ constructor takes in a pub_rate parameter that allows you to set how frequently you get sensor readings; as it is tied to your hardware and you know it best!

Changing the device being read is implemented using a parameter named address that defaults to a parameter passed into the class' constructor named address as well.

Taring your sensor is done through a service call under the topic /force_sensor/tare.

The two main functions are basically the same samples you have provided but "ROS-ified".

In the main function, we simply initialize a node, create an object of the ForceSensorInterface class, and spin the node.

The code should act as a general pointer towards how to use publishers, services, and parameters to control your logic, and you should be able to change as per your requirements (e.g. changing the pressure readings to be a service call instead of a service call). The footnotes include a couple of tutorials that might be of help for you. Good luck and happy coding!

Resources:

$\endgroup$
1
$\begingroup$

I am not sure why you are asking for the service example. But here is a quick example, following this tutorial. Also not sure what tare function is doing. But you can do similar thing for that fucntion as well in the same class and Node.

from node_name.srv import SrvType

import rclpy
from rclpy.node import Node
# Do other Import according to need

class GetSensorDataSever(Node):

    def __init__(self):
        super().__init__('get_sensor_data')
        self.read_srv = self.create_service(SrvType, '/ask_sensor_data', self.read_data)

    def read_data(self, request, response):
        if request.address ==1: 
            send  = [0x01,0x03,0x00,0x00,0x00,0x02,0xC4,0x0B]

        GPIO.output(TXDEN_1, GPIO.HIGH)
        ser.write(serial.to_bytes(send))
        GPIO.output(TXDEN_1, GPIO.LOW)

        data = ser.read(ser.inWaiting())  
        hex_data= ([format(x, '02x') for x in data])
        response.res = struct.unpack('!f',bytes.fromhex(''.join(hex_data[3:7])))[0] 
        # here response.res should be the same data type as the return value defined in .srv file
        # If type is not standard, you can always convert it to string and put string msg in response

    return response

def main():
    rclpy.init()

    _service = GetSensorDataSever()

    rclpy.spin(_service)

    rclpy.shutdown()


if __name__ == '__main__':
    main()
$\endgroup$

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.