Multi-thread of ROS 2 with Callback Group - Part 1: Reentrant

May 15, 2023
This article investigates ROS2 multi-threaded executor based on the callback group. In ROS 2, we can assign a callback to a callback group whose types are either Reentrant or Mutually exclusive.
👾

Recommend to read previous article. All the publisher and subscription system is the same, and I will not explain them here.

In the previous article, I wrote about the single threading of callback processing in ROS2. Here, let me cover multi-threaded callback processing. The publisher node and the problem scenario is similar with the previous article article.

In ROS2, almost every operations are processed in callbacks (subscription, services, timers...): they are queued at corresponding events (e.g., subscription callback is queued when message arrived.) and called (executed) by executors (opens in a new tab).

This article is about how ROS 2 provides a utility when executing callbacks on multiple-threads. First of all, we have MultiThreadedExecutor where callbacks are executed on multiple threads:

main.cpp
int main(int argc, char **argv) {
  rclcpp::init(argc, argv);
  auto node = std::make_shared<MultiThreadReentrantSubscriber>();
  rclcpp::executors::MultiThreadedExecutor executor;
  executor.add_node(node);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}

How are executor threading (scheduling) the callbacks defined inside node? Callback groups (opens in a new tab) are the key to understanding:

  • Reentrant Callback Group
  • Mutually Exclusive Callback Group

For example, we register callbacks into a reentrant callback group by:

multi_thread_reentrant_subscriber_node.cc
MultiThreadReentrantSubscriber() : Node("subscriber_node") {
  rclcpp::SubscriptionOptions options;
  options.callback_group =
      create_callback_group(rclcpp::CallbackGroupType::Reentrant);
 
  short_subscriber_ = create_subscription<std_msgs::msg::String>(
      "/short_topic", rclcpp::QoS(10),
      std::bind(&MultiThreadReentrantSubscriber::ShortTopicCallback, this,
                std::placeholders::_1),
      options);
 
  long_subscriber_ = create_subscription<std_msgs::msg::String>(
      "/long_topic", rclcpp::QoS(10),
      std::bind(&MultiThreadReentrantSubscriber::LongTopicCallback, this,
                std::placeholders::_1),
      options);
}

This article will investigate the Reentrant Callback Group, and the following article will explain Mutually Exclusive Callback Group.

ℹ️

The full source code of this implementation can be found here (opens in a new tab).

Concept

If we have create a reentrant callback group and assign callbacks to the group, executors will try to assign the incoming callbacks to available callbacks threads in a FIFO manner as the figure shows.

Can you guess what if there's no available thread? For example, callback processing takes a long time, compared to the speed of queueing. Reading to the end will answer this.

Experiment result

Having the publisher (opens in a new tab) sending topics as below:

Terminal output of publisher node
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester publisher_node
[INFO] [1684145121.677229561] [publisher_node]: Short message (seq=0)
[INFO] [1684145121.777211870] [publisher_node]: Short message (seq=1)
[INFO] [1684145121.877112879] [publisher_node]: Short message (seq=2)
...
[INFO] [1684145122.377120627] [publisher_node]: Short message (seq=7)
[INFO] [1684145122.477210436] [publisher_node]: Short message (seq=8)
[INFO] [1684145122.577112846] [publisher_node]: Short message (seq=9)
[INFO] [1684145122.577202746] [publisher_node]: Long message with more characters (seq=0)
[INFO] [1684145122.677122355] [publisher_node]: Short message (seq=10)
[INFO] [1684145122.777128565] [publisher_node]: Short message (seq=11)
...
[INFO] [1684145123.277118304] [publisher_node]: Short message (seq=16)
[INFO] [1684145123.377120107] [publisher_node]: Short message (seq=17)
[INFO] [1684145123.477122111] [publisher_node]: Short message (seq=18)
[INFO] [1684145123.577089714] [publisher_node]: Short message (seq=19)
[INFO] [1684145123.577184314] [publisher_node]: Long message with more characters (seq=1)
[INFO] [1684145123.677160617] [publisher_node]: Short message (seq=20)
[INFO] [1684145123.777177921] [publisher_node]: Short message (seq=21)

Let us assume that the subscriber takes 2s to process each callback, where it adds stars to the both ends of the message, and id of the processing thread id as:

publisher_node.cc
std::string
MultiThreadReentrantSubscriber::ProcessString(const std::string &raw_string) {
  std::thread::id this_id = std::this_thread::get_id();
  std::ostringstream oss;
  oss << std::this_thread::get_id();
 
  std::this_thread::sleep_for(std::chrono::seconds(2));
  return "** " + raw_string + " **" + " ( by " + oss.str() + ") ";
};
 
void MultiThreadReentrantSubscriber::ShortTopicCallback(
    const std_msgs::msg::String::SharedPtr msg) {
  auto processed_string = ProcessString(msg->data);
  processed_short_string_ = processed_string;
 
  RCLCPP_INFO(get_logger(), "Setting processed:  %s", processed_string.c_str());
  processed_long_string_ = processed_string;
}
 

Then, the result of the callback will be the below:

Terminal output of subscriber node
jbs@jbs:~/ros2_ws$ ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
[INFO] [1684145123.678675118] [subscriber_node]: Setting processed:  ** Short message (seq=0) ** ( by 140113577211648)
[INFO] [1684145123.778082521] [subscriber_node]: Setting processed:  ** Short message (seq=1) ** ( by 140113859917568)
[INFO] [1684145123.877571624] [subscriber_node]: Setting processed:  ** Short message (seq=2) ** ( by 140113778571008)
[INFO] [1684145123.977534428] [subscriber_node]: Setting processed:  ** Short message (seq=3) ** ( by 140113761785600)
[INFO] [1684145124.078418731] [subscriber_node]: Setting processed:  ** Short message (seq=4) ** ( by 140113770178304)
[INFO] [1684145124.177545434] [subscriber_node]: Setting processed:  ** Short message (seq=5) ** ( by 140113753392896)
[INFO] [1684145124.277588538] [subscriber_node]: Setting processed:  ** Short message (seq=6) ** ( by 140113745000192)
[INFO] [1684145124.377553041] [subscriber_node]: Setting processed:  ** Short message (seq=7) ** ( by 140113728214784)
[INFO] [1684145124.477698345] [subscriber_node]: Setting processed:  ** Short message (seq=8) ** ( by 140113736607488)
[INFO] [1684145124.577653748] [subscriber_node]: Setting processed:  ** Short message (seq=9) ** ( by 140113635960576)
[INFO] [1684145124.577692848] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=0) ** ( by 140113644353280)
[INFO] [1684145124.677715452] [subscriber_node]: Setting processed:  ** Short message (seq=10) ** ( by 140113619175168)
[INFO] [1684145124.777614455] [subscriber_node]: Setting processed:  ** Short message (seq=11) ** ( by 140113627567872)
[INFO] [1684145124.877568058] [subscriber_node]: Setting processed:  ** Short message (seq=12) ** ( by 140113610782464)
[INFO] [1684145124.977619262] [subscriber_node]: Setting processed:  ** Short message (seq=13) ** ( by 140113602389760)
[INFO] [1684145125.077692265] [subscriber_node]: Setting processed:  ** Short message (seq=14) ** ( by 140113585604352)
[INFO] [1684145125.177719769] [subscriber_node]: Setting processed:  ** Short message (seq=15) ** ( by 140113593997056)
[INFO] [1684145125.277602572] [subscriber_node]: Setting processed:  ** Short message (seq=16) ** ( by 140113876702976)
[INFO] [1684145125.377583375] [subscriber_node]: Setting processed:  ** Short message (seq=17) ** ( by 140113976041280)
[INFO] [1684145125.477735379] [subscriber_node]: Setting processed:  ** Short message (seq=18) ** ( by 140113868310272)
[INFO] [1684145125.679380286] [subscriber_node]: Setting processed:  ** Short message (seq=19) ** ( by 140113577211648)
[INFO] [1684145125.778339289] [subscriber_node]: Setting processed:  ** Long message with more characters (seq=1) ** ( by 140113859917568)
[INFO] [1684145125.877828592] [subscriber_node]: Setting processed:  ** Short message (seq=20) ** ( by 140113778571008)
[INFO] [1684145125.977725796] [subscriber_node]: Setting processed:  ** Short message (seq=21) ** ( by 140113761785600)
[INFO] [1684145126.078575999] [subscriber_node]: Setting processed:  ** Short message (seq=22) ** ( by 140113770178304)

Analysis

For the brevity, I denoted the time format 1684145126.078575999 as 26.07.

Reentrant group was using 20 threads and handled events in FIFO.

From the result, you will find that total 20 callback events (from seq = 0 to seq=18 of short_topic (including seq=0 of long_topic)) are handled in distinctive threads and seq=19 of short_topic are assigned to the same thread 140113577211648 with seq=0. Ah! Callbacks of the reentrant callback group are using 20 threads. At this time, there was no round robing as single thread executor.

When no available thread, events in the queue are waiting..

It was almost the case that every time gaps of subscriber callback ending of short_topic was 0.1s which is the exact publishing rate of short_topic. But why 0.2 stride between seq=18 and seq=19 of short_topic?

[INFO] [1684145125.277602572] [subscriber_node]: Setting processed:  ** Short message (seq=16) ** ( by 140113876702976)
[INFO] [1684145125.377583375] [subscriber_node]: Setting processed:  ** Short message (seq=17) ** ( by 140113976041280)
[INFO] [1684145125.477735379] [subscriber_node]: Setting processed:  ** Short message (seq=18) ** ( by 140113868310272)
[INFO] [1684145125.679380286] [subscriber_node]: Setting processed:  ** Short message (seq=19) ** ( by 140113577211648)
 

seq=18 used the last available thread 140113868310272, and seq=19 should wait until the first thread 140113577211648 is freed from seq=0 callback handling.