Multi Thread of ROS 2 with Callback group - Part 2: Mutually exclusive

May 19, 2023
This article investigates ROS2 multi-threaded executor based on the callback group. In ROS 2, we can assign a callback to a callback group whose types are either Reentrant or Mutually exclusive.

In the previous article, we discussed what is Reentrant group and the thread scheduling behind the scene. Today, we will look into the other callback group: Mutually Exclusive callback group. I assume that you have a preliminary understanding of the callback groups.

Key takeaways

1. Callbacks in a same MutuallyExclusive group are NOT exactly same with SingleThreadedExecutor.

In the official documentation (opens in a new tab), it is said that:

Mutually Exclusive Callback Group prevents its callbacks from being executed in parallel - essentially making it as if the callbacks in the group were executed by a SingleThreadedExecutor.

Then, some might expect that the callbacks in a same MutuallyExclusive group are working exactly the same as SingleThreadedExecutor processes the callbacks. For a delayed processing of the callbacks, this is not true.

In my previous article, callbacks in a SingleThreadedExecutor are processed in round robin (a.k.a., not FIFO) if the callbacks take more time than the speed of event happening. But MutuallyExclusive group was not found to show this round-robin behavior.

2. Of course, callbacks in different Mutually Exclusive groups are not thread-safe.

If you are expecting from rclcpp that this will keep thread-safety between different MutuallyExclusive groups, then do not. You should use mutex, and define critical sections.

Testbed: publisher and subscriber nodes

The overall scenario is very similar to the previous article. You can find all the codes here (opens in a new tab)

Publisher node

This node (opens in a new tab) publishes short_topic at 10Hz and long_topic at 1Hz. The example run output is:

jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester publisher_node
[INFO] [1684487967.263839506] [publisher_node]: Short message (seq=0)
[INFO] [1684487967.363765263] [publisher_node]: Short message (seq=1)
...
[INFO] [1684487968.063749259] [publisher_node]: Short message (seq=8)
[INFO] [1684487968.163744316] [publisher_node]: Short message (seq=9)
[INFO] [1684487968.163889616] [publisher_node]: Long message with more characters (seq=0)
[INFO] [1684487968.263754273] [publisher_node]: Short message (seq=10)
 

Subscriber node

This node (opens in a new tab) has three callbacks:

  • (Setters) two subscription callbacks for short_topic and long_topic messages. This callbacks sets the incoming messages into member variables processed_short_string_ and processed_long_string_ of MultiThreadMutuallyExclusiveSubscriber node. processed_*_string_ is created by adding starts to the both ends of received messages. For exepriment, we assume that coping a character takes 0.1s. Thus, processing callbacks requires more than their publish rates. (0.1s and 1s.)

    multi_thread_mutually_exclusive_subscriber_node.cc
    ...
    void MultiThreadMutuallyExclusiveSubscriber::ProcessString(
        const std::string &raw_string, std::string &output) {
      output = "** ";
      for (char c : raw_string) {
        output.push_back(c);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
      }
      output += " **";
    };
    ...
    void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback(
      const std_msgs::msg::String::SharedPtr msg) {
      ...
      ProcessString(msg->data, processed_long_string_);
      RCLCPP_INFO(get_logger(), "Setting processed:  %s",
                  processed_long_string_.c_str());
      ...
     
    }
     
  • (Getter) a timer callback which prints the member variables.

    void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() {
    ...
    RCLCPP_INFO(get_logger(),
                "Getting processed strings: \n  [long] %s \n [short] %s",
                processed_long_string_.c_str(), processed_short_string_.c_str());
    ...
     
    }

    Example prints are:

    [INFO] [1684486266.634665189] [subscriber_node]: Getting processed strings:
    [long] ** Long message with more characters (seq=31) **
    [short] ** Short message (seq=384) **
     

Testing

1. Putting callbacks together in a same MutuallyExclusive group

For this test, let us assume that TimerCallback() is not used, having only two subscription callbacks.

class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {
 
public:
  MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
      : Node("subscriber_node",
             node_options.allow_undeclared_parameters(true)) {
    rclcpp::SubscriptionOptions options;
    options.callback_group =
        create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
 
    short_subscriber_ = create_subscription<std_msgs::msg::String>(
        "/short_topic", rclcpp::QoS(10),
        std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
                  this, std::placeholders::_1),
        options);
 
    long_subscriber_ = create_subscription<std_msgs::msg::String>(
        "/long_topic", rclcpp::QoS(10),
        std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
                  this, std::placeholders::_1),
        options);
 
    ...
 

We register the node into MultiThreadedExecutor.

int main(int argc, char **argv) {
  rclcpp::init(argc, argv);
  rclcpp::NodeOptions option;
  auto node = std::make_shared<MultiThreadMutuallyExclusiveSubscriber>(option);
  rclcpp::executors::MultiThreadedExecutor executor;
  executor.add_node(node);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}
 

Results

Publisher node output the below:

jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester publisher_node
[INFO] [1684497083.896133935] [publisher_node]: Short message (seq=0)
[INFO] [1684497083.996157929] [publisher_node]: Short message (seq=1)
[INFO] [1684497084.096092124] [publisher_node]: Short message (seq=2)
...
[INFO] [1684497084.596053297] [publisher_node]: Short message (seq=7)
[INFO] [1684497084.696097192] [publisher_node]: Short message (seq=8)
[INFO] [1684497084.796039187] [publisher_node]: Short message (seq=9)
[INFO] [1684497084.796129187] [publisher_node]: Long message with more characters (seq=0)
[INFO] [1684497084.896058682] [publisher_node]: Short message (seq=10)
[INFO] [1684497084.996097976] [publisher_node]: Short message (seq=11)
[INFO] [1684497085.096098071] [publisher_node]: Short message (seq=12)
[INFO] [1684497085.196101466] [publisher_node]: Short message (seq=13)
...
[INFO] [1684497085.796172334] [publisher_node]: Short message (seq=19)
[INFO] [1684497085.796503034] [publisher_node]: Long message with more characters (seq=1)
[INFO] [1684497085.896102029] [publisher_node]: Short message (seq=20)
[INFO] [1684497085.996114123] [publisher_node]: Short message (seq=21)
...
[INFO] [1684497086.596048091] [publisher_node]: Short message (seq=27)
[INFO] [1684497086.696198386] [publisher_node]: Short message (seq=28)
...
[INFO] [1684497087.196084460] [publisher_node]: Short message (seq=33)
[INFO] [1684497087.296135654] [publisher_node]: Short message (seq=34)
...

This is subscription result:

MultiThreadedExecutor with MutuallyExclusive result
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_mutually_exclusive_subscriber_node
[INFO] [1684497086.000256023] [subscriber_node]: Setting processed:  ** Short message (seq=0) **
[INFO] [1684497088.205206606] [subscriber_node]: Setting processed:  ** Short message (seq=12) **
[INFO] [1684497090.409344367] [subscriber_node]: Setting processed:  ** Short message (seq=34) **
[INFO] [1684497092.613088724] [subscriber_node]: Setting processed:  ** Short message (seq=56) **
[INFO] [1684497094.816876581] [subscriber_node]: Setting processed:  ** Short message (seq=78) **
[INFO] [1684497097.121857358] [subscriber_node]: Setting processed:  ** Short message (seq=100) **
[INFO] [1684497099.426175711] [subscriber_node]: Setting processed:  ** Short message (seq=123) **
[INFO] [1684497101.730305265] [subscriber_node]: Setting processed:  ** Short message (seq=146) **
[INFO] [1684497104.034430018] [subscriber_node]: Setting processed:  ** Short message (seq=169) **
 

Is is very clear: callback for long_topic was hardly handled. This is very different from the round-robin results of SingleThreadedExecutor (please ignore timestamps):

SingleThreadedExecutor result
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester single_thread_subscriber_node
[INFO] [1684494333.628047147] [subscriber_node]: Setting processed:  ** Short message (seq=0) **
[INFO] [1684494335.628859766] [subscriber_node]: Setting processed:  ** Short message (seq=11) **
[INFO] [1684494337.629064684] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=0) **
[INFO] [1684494339.629355003] [subscriber_node]: Setting processed:  ** Short message (seq=51) **
[INFO] [1684494341.629804743] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=1) **
[INFO] [1684494343.630044844] [subscriber_node]: Setting processed:  ** Short message (seq=91) **
[INFO] [1684494345.630372444] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=2) **
[INFO] [1684494347.630739145] [subscriber_node]: Setting processed:  ** Short message (seq=131) **

Apparently, the first result suggests that MultiThreadedExecutor for MutuallyExclusive callback groups processes only short_topic as its callback has more chances to lock the mutex than long_topic. It is working as if SingleThreadedExecutor only for short_topic:

  • short topic seq=0 was published and subscribed at 83s.
  • short topic seq=0 was finished for processing at 86s.
  • short topic seq=12 was received at 85 and the first in the queue at 86 (queue size = 10).

This skipping behavior was reported in the issues (opens in a new tab), where a contributor Barry-Xu-2018 said (opens in a new tab) MultiThreadedExecutor can clear or (skip) (opens in a new tab) some waiting entities due to mutex related issues.

2. Putting callbacks in different MutuallyExclusive groups

Now, we consider the timer callback to check thread safety (setters from subscribers vs getter from timer callback), and assign subscribers to different groups as below:

class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {
 
public:
  MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
      : Node("subscriber_node",
             node_options.allow_undeclared_parameters(true)) {
    rclcpp::SubscriptionOptions options;
    options.callback_group =
        create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
 
    short_subscriber_ = create_subscription<std_msgs::msg::String>(
        "/short_topic", rclcpp::QoS(10),
        std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
                  this, std::placeholders::_1),
        options);
 
    options.callback_group =
        create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
    long_subscriber_ = create_subscription<std_msgs::msg::String>(
        "/long_topic", rclcpp::QoS(10),
        std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
                  this, std::placeholders::_1),
        options);
 
    timer_ = create_wall_timer(
        1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback,
                      this));
 

Not assigning any callback_group to create_wall_timer equals to the default MutuallyExclusive callback group which is another callback group.

If you just run multi_thread_reentrant_subscriber_node as: (opens in a new tab)

ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node

Initially it will not use mutexes. Thus, what we have is:

jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_mutually_exclusive_subscriber_node
[INFO] [1684499585.741976759] [subscriber_node]: Getting processed strings:
  [long]
 [short]
[INFO] [1684499586.741949677] [subscriber_node]: Getting processed strings:
  [long]
 [short]
...
[INFO] [1684499589.926415389] [subscriber_node]: Setting processed:  ** Short message (seq=0) **
[INFO] [1684499592.830046581] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=0) **
[INFO] [1684499592.830167081] [subscriber_node]: Getting processed strings:
[INFO] [1684499596.937670591] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=1) **
[INFO] [1684499596.937803491] [subscriber_node]: Getting processed strings:
  [long] ** Long message with more characters (seq=1) **
 [short] ** Short message (seq=12) **
[INFO] [1684499599.141354135] [subscriber_node]: Setting processed:  ** Short message (seq=63) **
[INFO] [1684499601.044588832] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=2) **
...
[INFO] [1684499727.951774404] [subscriber_node]: Getting processed strings:
  [long]
 [short]
[INFO] [1684499728.951765511] [subscriber_node]: Getting processed strings:
  [long] ** L
 [short] ** Short mess
[INFO] [1684499729.951769118] [subscriber_node]: Getting processed strings:
  [long] ** Long messag
 [short] ** Short message (seq=0
[INFO] [1684499730.100030734] [subscriber_node]: Setting processed:  ** Short message (seq=0) **
[INFO] [1684499730.951765526] [subscriber_node]: Getting processed strings:
  [long] ** Long message with mor
 [short] ** Short mes
[INFO] [1684499731.951739133] [subscriber_node]: Getting processed strings:
  [long] ** Long message with more characte
 [short] ** Short message (seq=
[INFO] [1684499732.302576970] [subscriber_node]: Setting processed:  ** Short message (seq=12) **
 

In conclusion, thread safety is not guaranteed. We now use two mutexes to protect thread safety as below:

void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback(
    const std_msgs::msg::String::SharedPtr msg) {
  std::shared_ptr<std::unique_lock<std::mutex>> lock;
  if (use_mutex)
    mutex_list.short_topic_mutex.lock();
  ProcessString(msg->data, processed_short_string_);
  RCLCPP_INFO(get_logger(), "Setting processed:  %s",
              processed_short_string_.c_str());
  if (use_mutex)
    mutex_list.short_topic_mutex.unlock();
}

This code is intended for a thread safety experiment, and it is a bad design. We should minimize the critical sections and not include ProcessString(...); in the mutex zone.

subscriber_node will use mutex by param set:

ros2 param set /subscriber_node use_mutex true

The result? Thread safe!:

jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_mutually_exclusive_subscriber_node
[INFO] [1684500853.761733245] [subscriber_node]: Getting processed strings:
  [long]
 [short]
[INFO] [1684500854.761738832] [subscriber_node]: Getting processed strings:
  [long]
 [short]
 [short]
[INFO] [1684500858.252912414] [subscriber_node]: will use mutex
[INFO] [1684500858.761723597] [subscriber_node]: Getting processed strings:
  [long]
 [short]
[INFO] [1684500859.761786863] [subscriber_node]: Getting processed strings:
  [long]
 [short]
[INFO] [1684500860.761909929] [subscriber_node]: Getting processed strings:
[INFO] [1684500870.203406757] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=1) **
[INFO] [1684500870.203533957] [subscriber_node]: Getting processed strings:
  [long] ** Long message with more characters (seq=1) **
 [short] ** Short message (seq=12) **
[INFO] [1684500872.407167598] [subscriber_node]: Setting processed:  ** Short message (seq=63) **
[INFO] [1684500874.310072249] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=2) **
[INFO] [1684500874.310223049] [subscriber_node]: Getting processed strings:
  [long] ** Long message with more characters (seq=2) **
 [short] ** Short message (seq=63) **
[INFO] [1684500876.613996191] [subscriber_node]: Setting processed:  ** Short message (seq=104) **
[INFO] [1684500878.417606122] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=3) **
[INFO] [1684500878.417995222] [subscriber_node]: Getting processed strings:
  [long] ** Long message with more characters (seq=3) **
 [short] ** Short message (seq=104) **
[INFO] [1684500880.722269197] [subscriber_node]: Setting processed:  ** Short message (seq=146) **
[INFO] [1684500882.524751487] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=7) **
[INFO] [1684500882.524907086] [subscriber_node]: Getting processed strings:
  [long] ** Long message with more characters (seq=7) **