UP | HOME

Threads, Spinning, and Async in ROS 2

Overview

  • Python ROS 2 nodes (unlike their counterparts in ROS 1) are single-threaded and sequential by default.
  • Lack of threads makes reasoning about ROS 2 python nodes easier, makes them more similar to C++ nodes (which were single-threaded in ROS 1) and also provides more control over execution
  • ROS 2 python, however, supports highly configurable modes of execution, which require knowledge of both the python concurrency model and how ROS 2 fits in with it.
  • In most cases, the single-threaded sequential of ROS 2 makes coding much easier and eliminates whole classes of possible bugs
  • This default model, however, is sometimes overly restrictive and reduces potential performance
  • There is also one common case where sticking with the default model makes coding much more difficult: using service clients.

Service Clients

  • When a service is called, the client needs to wait to get a result
  • Sometimes you do not want to execute further code until the result is received
    • In other words, you want to issue a synchronous call to the service client so it blocks until it receives the result
  • Sometimes, you want to be able to perform useful work while waiting for the result to be received
    • In other words you want to issue an asynchronous call to the service client so that it does not block (is non-blocking)

Steps to Call a Service

  1. Create a service client (call it client) using Node.create_client
  2. Wait for the service to become available using client.wait_for_service(...)
  3. Issue a request with client.call_async
    • Synchronous calls are generally not well supported in ROS 2
  4. If you don't want to block
    • Do a little bit of work
    • Check for a result
    • Repeat until a result is ready
  5. If you do want to block, wait for the result until it is ready
    • The Service Tutorial uses rclpy.spin_until_future_complete to accomplish this task
    • But the code in the tutorial WILL NOT WORK if used from within another callback function
    • What is a future anyway? And why is it spinning?

The rest of these notes address how to easily wait for a service call to complete in ROS 2, while also explaining in some detail the concepts and machinery required to make it happen. For quick instruction see Patterns.

Events

  • External events in ROS include
    • Receiving a message on a topic (subscriber callback)
    • A timer triggering (timer callback: the ROS time is external to the node)
    • Receiving a response to a service request (future object)
    • A parameter changing (the parameter callback)
  • As external events occur, they must be explicitly processed by the node
  • Each ROS node must explicitly process events by "spinning"
    • rclpy.spin_once causes the node to check for and process the next pending event (e.g., by calling a subscriber callback for a received message)
    • rclpy.spin creates an event loop, which repeatedly checks for events and handles them
      • It is essentially an infinite loop that repeatedly calls spin_once()
    • There is also rclpy.spin_until_future_complete which will call spin_once() repeatedly until a specified event occurs.
  • As a general rule (there are exceptions) your node should be organized around a single call to rclpy.spin() with periodic tasks being handled with a timer

Callbacks

  • A callback is a function (or callable object) that is provided to an object to be called at a later time.
  • In ROS 2, callbacks are explicitly registered when (for example)
    • Creating a subscriber (the callback is called whenever a message is received)
    • A timer is created (the callback is called whenever the timer is triggered)
    • A service server is created (the callback is called whenever somebody calls the server)
  • Callbacks are also implicitly used when a service client calls a service
    • The callback is what receives the response from the service server
  • Callbacks are only called whenever rcply.spin_once is called (which can be and usually is from within rclpy.spin).

Callback Groups

  • By default, ROS 2 only allows only one callback function to be pending on the callstack.
    • This behavior means that spinning from within a callback will not allow other callback functions to be called (via rclpy.spin_once).
    • In other words, the callbacks can never be nested.
  • The behavior of what callbacks can be called by rclpy.spin_once when is controlled using Callback Groups
  • There are two primary callback groups types: MutuallyExclusiveCallbackGroup and ReentrantCallbackGroup.

MutuallyExclusiveCallbackGroup

  • No more than one callback in the same MutuallyExclusiveCallbackGroup can ever be running concurrently.
  • By default, all callbacks are placed in the same MutuallyExclusiveCallbackGroup.
  • The mutually exclusive property makes it easy to reason about callbacks that may touch the same variables (e.g., global variables or member variables) because callbacks must always run to completion before another one can be called.
    • For example, when a timer is part of a MutuallyExclusiveCallbackGroup you know that the timer callback will always run to completion before it is called again, even if the timer expires sooner or if multiple threads are used
  • If you explicitly spin_once in a callback within a MutuallyExclusiveCallbackGroup none of the callbacks in that group will be called, even if there is a pending event.
    • Callbacks in other callback groups can, however, be called

ReentrantCallbackGroup

  • All callbacks in a ReentrantCallbackGroup can run concurrently, this means that it is possible for a callback to be only partially complete before it is called again
  • If you explicitly spin_once from within a callback that is part of a ReentrantCallbackGroup then any callback in that same group can be called

ROS and Python Async I/O

Python

  • The python asyncio library, along with the async and await keywords, enables single-threaded cooperative concurrency in python.
  • The heart of any asynchronous (async) program is an event loop. In ROS, the event loop is provided by rclpy.spin.
  • The event loop is responsible for scheduling the execution of Tasks.
  • Tasks are able to run coroutines which are functions that can suspend themselves mid-execution
  • When a coroutine suspends itself mid-execution the event loop can schedule another Task
  • A coroutine is created by declaring a function with the async keyword: async def my_coroutine():
  • A coroutine can suspend itself using the await keyword: this allows it to either
    1. Call another coroutine and wait for it to finish (e.g., await my_couroutine1())
    2. Wait for the result of a Future object to become available.

A function that may take a long time to compute a result (call it long_function) can instead

  1. Create a Future() object (call it future)
  2. Use a task to schedule a co-routine (call it data_provider) that has access future and can provide it with the data when that data is ready
  3. Return future to the caller
  4. When the co-routine that calls long_function (call it callback) first gets future, future.done() is False, meaning that data is not available
  5. The calling function needs to somehow wait for future.done() to become True and it can then find the data in future.result()
  6. One way of waiting for future.done() to become true is to await the future
  7. When the co-routine calls await on future it suspends its execution, allowing other co-routines to run, including data_provider
  8. When data_provider gets the data, it provides that data to future
    • Now future.done() will be True and future.result() will contain the data
    • The next time callback is scheduled, it will resume from where it suspended execution via await, and the future will now have data.

ROS 2

  • In ROS 2, the event loop is provided by rclpy.spin, and callbacks are scheduled by this event loop
  • Callbacks in ROS 2 can be functions or co-routines.
  • If a callback is a co-routine, it can await on a future
  • For a ROS 2 service client, client.call_async() returns a future
  • So, if a callback is async it can await the result of client.call_async().
    • It will suspend its operation and return control to the event loop (rclpy.spin)
    • If the callback groups are set up correctly, rclpy.spin() will be able to call the server client callback when the response arrives
    • The future will then be done, and the callback that issued the service request will have the result and be able to continue
    • To get the response from the service use: response = await client.call_async()

Deadlocks

  • A deadlock occurs when code cannot make progress
  • It is easy to accidentally cause a deadlock (where your program can make no further progress)
  • When using multi-threading, deadlocks can happen in several different ways (e.g., by acquiring and releasing locks out of order)
  • Deadlocks are, unfortunately possible even with single-threaded concurrency like asyncio
  • In general, with ROS 2, a deadlock will occur if any part of your code depends on a callback running but either
    1. The code is not able to spin
    2. Callback's A and B are in the same MutuallyExclusiveCallbackGroup and callback A requires callback B to run to make progress
  • The most common source of deadlocks occurs when waiting for a response from a service client (which implicitly has a hidden callback that must happen to receive the response data).
  • By sticking with certain patterns, deadlocks can be avoided.

Patterns

  • The ROS 2 architecture provides a lot of control over concurrency, and generally helps avoid data races (when two different pieces of code try to write to the same memory)
  • While there are many ways of handling the issues of deadlocks (such as spinning in the right places and making sure callback groups are setup properly) some patterns can help reduce the decisions that the programmer needs to make while also simplifying the reasoning about code
  • Here are some general guidelines that generally work well and cover probably 80% of cases

Basic ROS 2 Node

  1. Use rclpy.spin in only one place in your program (likely the main entry point)
    • Now you know exactly where the spinning occurs
    • By following the other guidelines you will never need to spin again
    • An exception to this rule is if you need to do some one-time setup when your node starts
      • Then you may want to call services and spin until they are complete
      • Most setup, however, is not one-time. You may want the ability to, for example, reset and re-run the setup.
  2. Structure the main execution of your code around a single timer callback
    • Usually you want to be issuing commands at a single fixed frequency
    • If you have a timer running at X hz, it's easy to subdivide that into X/N hz
    • The timer should advance the progress of your algorithm by one time-step

Service Clients

Blocking Calls

  1. Blocking calls are the easiest to handle and are preferred unless performance requirements dictate otherwise
  2. Assuming that you want to call the service from callback_handler
  3. Make callback_handler an async function.
  4. Put callback_handler in a callback group that is different than that of the service clients
    • They can also all be in the same ReentrantCallbackGroup
    • In general, it is better to have as many callbacks in the same MutuallyExclusiveCallbackGroup as possible, as this is the most restrictive (hence safest), but most issues will not arise in a blocking, single-threaded context
  5. Call the services using await client.call_async()

This walkthrough provides a guide by walking through three examples related to event loops and asynchronicity. They are paired with specific nodes implemented in [[

Non-blocking

  1. For non-blocking service calls you need to implement logic that periodically checks the result of the future returned by call_async
  2. The logic that checks the result should be in a timer that checks the .done() status of the future.
    • It is crucial that the future is only checked one time per callback. It cannot change in the middle of the callback in a single-threaded program.
    • Callback groups do not need to be changed because every time the timer callback returns another callback can be executed.
  3. In this pattern, the work is being divided up (by you) into short spurts that can be executed within a timer period
  4. See the FutureClient example in https://github.com/m-elwin/async_experiments.

Walkthrough

These walkthroughs take you through the code in https://github.com/m-elwin/async_experiments in the order in which it is executed and provide a conceptual guide to what is happening.

Deadlock

  1. The deadlock node starts in the async_client.py:deadlock_entry function.
  2. The DeadlockClient constructor (__init__) is called:
    • It creates a service client
    • It creates a timer
  3. rclpy.spin is called, which implements an event loop as follows in pseudocode:

    while Node_Has_Not_Exited:
       Check for an event
       If an event has occured, call the appropriate callback
    
  4. After some time a timer event occurs and the timer callback (DeadlockClient.timer_callback) is called from the event loop.
  5. The timer callback calls the "delay" service using call_async and retrieves a Future object called future
    • This call sends a request to the node running the "delay" service, so that it may generate a response
    • The future keeps track of the status of the response
    • Initially, the response has not been received, so future.done() returns False
  6. The timer enters a loop that waits for future.done() to return true
  7. Meanwhile, the node implementing the "delay" service does some processing, and sends a response
  8. The DeadlockClient node is still looping, waiting for future.done() to be true
    • It will loop forever, because this loop does not call anything that checks for responses that can change the status of the Future() object so future.done() will always be False.

Await

  1. The await node starts in the async_client.py:await_entry function.
  2. The AwaitClient constructor (__init__) is called:
    • It creates a service client called _client
    • It creates a timer
    • Callback groups are setup so that the service client callback and timer callback can both be on the call stack at the same time
  3. rclpy.spin is called, which implements an event loop as follows in pseudocode:

    while Node_Has_Not_Exited:
       Check for an event
       If an event has occured, call the appropriate callback
    
  4. After some time a timer event occurs and the timer callback (AwaitClient.timer_callback) is called from the event loop.
  5. The timer callback calls the "delay" service using call_async and retrieves a Future object
    • We will call this object future even though we don't name it as we are awaiting directly on the return value
    • _client also has a reference to future (since it created future it maintains its own reference to it)
    • This call sends a request to the node running the "delay" service, so that it may generate a response
    • The future keeps track of the status of the response
    • This callback calls await on the future object
    • Initially, the response has not been received, so future.done() returns False
  6. Meanwhile, after receiving the request, the node offering the "delay" service begins creating a response
  7. Because the timer_callback used await, it returns control to the event loop. So the await node is now back in =rclpy.spin

    while Node_Has_Not_Exited:
       Check for an event
       If an event has occured, call the appropriate callback
    
  8. Meanwhile, the node offering the "delay" service finishes its computation and sends it's response
    • Event loop receives this event and calls the default callback associated with _client, providing the response data
  9. The _client.callback does the following:
    • Store the response in the future
    • Signal that future is done
  10. The event loop is re-entered.
    • Python is now able to resume executing the co-routine where future was awaited (the timer_callback)
  11. The timer callback resumes execution from after the await statement
    • Because the future is done() it knows the service call has completed
    • It finishes and the event loop rclpy.spin() is re-entered

Concurrency

Execution Models

  • In sequential computing, only one computation happens at a given time. The computations happen in sequence.
    • When using a sequential model, callback groups don't matter (unless you explicitly spin_once from within a callback).
      • Unless spin_once is called, nothing attempts to call the callbacks
      • Each callback will run until completion.
  • In parallel computing, multiple computations can happen simultaneously.
    • Callback groups matter, and there are other synchronization issues related to multi-threading
  • In concurrent computing, multiple computations can be pending at the same time, but are not executing sequentially.
  • For example, consider the following chess-playing scenarios
    • A person playing one chess game, followed by another chess game: this is like sequential computation.
    • Two pairs of people playing two chess games simultaneously: this is like parallel computation.
    • One person playing two chess games against two opponents. First the person makes a move in game 1, then makes a move in game 2. Both games are in progress at the same time, but the person is not making a move in each game simultaneously.

Process

  1. A process is an abstraction used by the Linux Kernel (and other operating systems) to segregate memory address space.
    • Code running in each process thinks it has access to the full memory-address space: in reality the kernel maps a processes memory into the physical ram
    • Code running in each process cannot access memory used by other processes without operating-system intervention, increasing stability and security.
    • Programmers do not need to worry about the intricacies of physical memory: to them memory is just a flat contiguous address space
  2. Every process has a "main" thread, which executes the machine code within the context of the process's memory address space

Thread

  1. A thread is subordinate unit of execution within a process, and it is what actually executes the machine code
  2. By default each process has a single "main" thread. But it can also create additional threads to run code simultaneously.
    • All threads within a process share the same memory address space
    • Each thread has its own stack (for calling functions and storing local variables)
    • The heap and global variables are shared by threads
    • Unlike processes, threads can directly read and write the memory used by another thread
  3. Each thread is scheduled for execution by the kernel
    • If the machine has multiple CPUs then threads can be executed in parallel
    • If there are more threads than CPUs (as is usually the case), the kernel rapidly switches between threads, executing little bits of code each time
      • In this case the code is running concurrently, but it is fast enough to simulate simultaneity
  4. If two threads access the same memory and are not coordinated properly, a bug called a race condition can occur.
    • A race condition can result in data corruption
    • It can also result in a deadlock, where no thread can continue executing
  5. The exact interleaving of the instructions across multiple threads is non-deterministic because it depends on how the kernel schedules execution, which in turn depends on what else is happening on the computer
    • Non-deterministic bugs in multi-threaded code may be from a race condition
    • Be cautious and have a plan before introducing extra threads into your program
  6. - As the operating system switches between threads, it is possible for instructions to be interrupted before completing:
    • Generally, a single python statement maps to multiple machine instructions and thus a statement can be interrupted in the middle of executing
    • Consider i = i + 1. The thread can be interrupted in the middle of reading the value of i, adding 1 to it, or storing the result in i
    • In the meantime, another thread can modify i, leading to incorrect results.
  7. Atomic operations finish executing without interruption. Atomic operations guarantee that a complete result will be computed and seen by all threads.
  8. Synchronization primitives such as a mutex or a semaphore can be used to coordinate execution between threads

Threads in Python

  1. The most common python interpreter, CPython, implements a Global Interpreter Lock (GIL)
    • The GIL simplifies the creation of the interpreter while preserving good single-threaded performance
    • It also severely limits multi-threaded performance in python.
  2. The GIL prevents multiple python instructions from executing simultaneously on a multi-core system.
    • Thus, multi-threading with CPython is like multi-threading on a single-core CPU: the python code is not executed simultaneously
    • Operations on each thread are interleaved (that is, the python interpreter executes some commands on one thread, then switches to another).
    • The order of this interleaving is generally non-deterministic
  3. The GIL does not prevent all race-conditions
    • You still need to synchronize threads that read/write to the same shared variables.
    • Bugs that occur due to specific orderings of operations on multiple threads can still occur
    • Non-atomic python statements can be interrupted before completion
  4. The GIL only applies to python bytecode instructions
    • Python code can call C code, and that C code can bypass the GIL
    • Python waits for a system function (e.g., to reading from a file), it is not executing bytecode. Thus, another thread can run while the other thread waits for the system.
    • Thus, multiple-threads in python can improve performance of input/output (I/O) bound (but not CPU bound) python programs but not
      • CPU bound means performance is limited by the available CPU resources
      • I/O bound means performance is limited by input/output operations (such as reading a file)
  5. Atomic Operations: The following operations in python are guaranteed to complete once started, prior to another thread being run
    • Reading or writing a single variable of a basic type (int, float, string, etc)
    • Assigning an object to a variable (e.g., x = y)
    • Reading an item from a list
    • Modifying an item in a list
    • Getting an item from a dictionary
    • A complete list of python atomic operations

Example

If you perform a non-atomic operation, the thread you are running on can be interrupted in the middle. Assume we have two threads, both performing i = i + 1, a non-atomic operation. Thus what is actually executed gets broken into several interruptable steps

i = 0
# Two threads are started
Thread 1            |        Thread 2
i = i + 1        |           i = i + 1

Operations can happen in multiple ways leading to different results: for example

Thread 1, reads i, it is 0
Thread 2, reads i, it is 0
Thread 1 adds 1 to what it read, yielding 1
Thread 1 stores 1 in i
Thread 1 reads i it is 1
Thread 1 adds 1 to what it read, yielding 2
Thread 2 adds 1 to what it read, yielding 1
Thread 1 stores 2 in i
Thread 2 stores 1 in i

Resources

Author: Matthew Elwin