Threads, Spinning, and Async in ROS 2
Overview
- Python ROS 2 nodes (unlike their counterparts in ROS 1) are single-threaded and sequential by default.
- Lack of threads makes reasoning about ROS 2 python nodes easier, makes them more similar to C++ nodes (which were single-threaded in ROS 1) and also provides more control over execution
- ROS 2 python, however, supports highly configurable modes of execution, which require knowledge of both the python concurrency model and how ROS 2 fits in with it.
- In most cases, the single-threaded sequential of ROS 2 makes coding much easier and eliminates whole classes of possible bugs
- This default model, however, is sometimes overly restrictive and reduces potential performance
- There is also one common case where sticking with the default model makes coding much more difficult: using service clients.
Service Clients
- When a service is called, the client needs to wait to get a result
- Sometimes you do not want to execute further code until the result is received
- In other words, you want to issue a synchronous call to the service client so it blocks until it receives the result
- Sometimes, you want to be able to perform useful work while waiting for the result to be received
- In other words you want to issue an asynchronous call to the service client so that it does not block (is non-blocking)
Steps to Call a Service
- Create a service client (call it
client
) usingNode.create_client
- Wait for the service to become available using
client.wait_for_service(...)
- Issue a request with
client.call_async
- Synchronous calls are generally not well supported in ROS 2
- If you don't want to block
- Do a little bit of work
- Check for a result
- Repeat until a result is ready
- If you do want to block, wait for the result until it is ready
- The Service Tutorial uses
rclpy.spin_until_future_complete
to accomplish this task - But the code in the tutorial WILL NOT WORK if used from within another callback function
- What is a future anyway? And why is it spinning?
- The Service Tutorial uses
The rest of these notes address how to easily wait for a service call to complete in ROS 2, while also explaining in some detail the concepts and machinery required to make it happen. For quick instruction see Patterns.
Events
- External events in ROS include
- Receiving a message on a topic (subscriber callback)
- A timer triggering (timer callback: the ROS time is external to the node)
- Receiving a response to a service request (future object)
- A parameter changing (the parameter callback)
- As external events occur, they must be explicitly processed by the node
- Each ROS node must explicitly process events by "spinning"
rclpy.spin_once
causes the node to check for and process the next pending event (e.g., by calling a subscriber callback for a received message)rclpy.spin
creates an event loop, which repeatedly checks for events and handles them- It is essentially an infinite loop that repeatedly calls
spin_once()
- It is essentially an infinite loop that repeatedly calls
- There is also
rclpy.spin_until_future_complete
which will callspin_once()
repeatedly until a specified event occurs.
- As a general rule (there are exceptions) your node should be organized around a single call to
rclpy.spin()
with periodic tasks being handled with a timer
Callbacks
- A callback is a function (or callable object) that is provided to an object to be called at a later time.
- In ROS 2, callbacks are explicitly registered when (for example)
- Creating a subscriber (the callback is called whenever a message is received)
- A timer is created (the callback is called whenever the timer is triggered)
- A service server is created (the callback is called whenever somebody calls the server)
- Callbacks are also implicitly used when a service client calls a service
- The callback is what receives the response from the service server
- Callbacks are only called whenever
rcply.spin_once
is called (which can be and usually is from withinrclpy.spin
).
Callback Groups
- By default, ROS 2 only allows only one callback function to be pending on the callstack.
- This behavior means that spinning from within a callback will not allow other callback functions to be called (via
rclpy.spin_once
). - In other words, the callbacks can never be nested.
- This behavior means that spinning from within a callback will not allow other callback functions to be called (via
- The behavior of what callbacks can be called by
rclpy.spin_once
when is controlled using Callback Groups - There are two primary callback groups types:
MutuallyExclusiveCallbackGroup
andReentrantCallbackGroup
.
MutuallyExclusiveCallbackGroup
- No more than one callback in the same
MutuallyExclusiveCallbackGroup
can ever be running concurrently. - By default, all callbacks are placed in the same
MutuallyExclusiveCallbackGroup
. - The mutually exclusive property makes it easy to reason about callbacks that may touch the same variables (e.g., global variables or member variables) because
callbacks must always run to completion before another one can be called.
- For example, when a
timer
is part of aMutuallyExclusiveCallbackGroup
you know that the timer callback will always run to completion before it is called again, even if the timer expires sooner or if multiple threads are used
- For example, when a
- If you explicitly
spin_once
in a callback within aMutuallyExclusiveCallbackGroup
none of the callbacks in that group will be called, even if there is a pending event.- Callbacks in other callback groups can, however, be called
ReentrantCallbackGroup
- All callbacks in a
ReentrantCallbackGroup
can run concurrently, this means that it is possible for a callback to be only partially complete before it is called again - If you explicitly
spin_once
from within a callback that is part of aReentrantCallbackGroup
then any callback in that same group can be called
ROS and Python Async I/O
Python
- The python asyncio library, along with the
async
andawait
keywords, enables single-threaded cooperative concurrency in python. - The heart of any asynchronous (async) program is an event loop. In ROS, the event loop is provided by
rclpy.spin
. - The event loop is responsible for scheduling the execution of Tasks.
- Tasks are able to run coroutines which are functions that can suspend themselves mid-execution
- When a coroutine suspends itself mid-execution the event loop can schedule another Task
- A coroutine is created by declaring a function with the
async
keyword:async def my_coroutine():
- A coroutine can suspend itself using the
await
keyword: this allows it to either- Call another coroutine and wait for it to finish (e.g.,
await my_couroutine1()
) - Wait for the result of a Future object to become available.
- Call another coroutine and wait for it to finish (e.g.,
A function that may take a long time to compute a result (call it long_function
) can instead
- Create a
Future()
object (call itfuture
) - Use a task to schedule a co-routine (call it
data_provider
) that has accessfuture
and can provide it with the data when that data is ready - Return
future
to the caller - When the co-routine that calls
long_function
(call itcallback
) first getsfuture
,future.done()
isFalse
, meaning that data is not available - The calling function needs to somehow wait for
future.done()
to becomeTrue
and it can then find the data infuture.result()
- One way of waiting for
future.done()
to become true is toawait
thefuture
- When the co-routine calls
await
onfuture
it suspends its execution, allowing other co-routines to run, includingdata_provider
- When
data_provider
gets the data, it provides that data tofuture
- Now
future.done()
will be True andfuture.result()
will contain the data - The next time
callback
is scheduled, it will resume from where it suspended execution viaawait
, and thefuture
will now have data.
- Now
ROS 2
- In ROS 2, the event loop is provided by
rclpy.spin
, and callbacks are scheduled by this event loop - Callbacks in ROS 2 can be functions or co-routines.
- If a callback is a co-routine, it can
await
on a future - For a ROS 2 service client,
client.call_async()
returns a future - So, if a callback is
async
it canawait
the result ofclient.call_async()
.- It will suspend its operation and return control to the event loop (
rclpy.spin
) - If the callback groups are set up correctly,
rclpy.spin()
will be able to call the server client callback when the response arrives - The future will then be done, and the callback that issued the service request will have the result and be able to continue
- To get the response from the service use:
response = await client.call_async()
- It will suspend its operation and return control to the event loop (
Deadlocks
- A deadlock occurs when code cannot make progress
- It is easy to accidentally cause a deadlock (where your program can make no further progress)
- When using multi-threading, deadlocks can happen in several different ways (e.g., by acquiring and releasing locks out of order)
- Deadlocks are, unfortunately possible even with single-threaded concurrency like
asyncio
- In general, with ROS 2, a deadlock will occur if any part of your code depends on a
callback
running but either- The code is not able to
spin
- Callback's
A
andB
are in the sameMutuallyExclusiveCallbackGroup
and callbackA
requires callbackB
to run to make progress
- The code is not able to
- The most common source of
deadlocks
occurs when waiting for a response from a serviceclient
(which implicitly has a hidden callback that must happen to receive the response data). - By sticking with certain patterns, deadlocks can be avoided.
Patterns
- The ROS 2 architecture provides a lot of control over concurrency, and generally helps avoid data races (when two different pieces of code try to write to the same memory)
- While there are many ways of handling the issues of deadlocks (such as spinning in the right places and making sure callback groups are setup properly) some patterns can help reduce the decisions that the programmer needs to make while also simplifying the reasoning about code
- Here are some general guidelines that generally work well and cover probably 80% of cases
Basic ROS 2 Node
- Use
rclpy.spin
in only one place in your program (likely the main entry point)- Now you know exactly where the spinning occurs
- By following the other guidelines you will never need to spin again
- An exception to this rule is if you need to do some one-time setup when your node starts
- Then you may want to call services and spin until they are complete
- Most setup, however, is not one-time. You may want the ability to, for example, reset and re-run the setup.
- Structure the main execution of your code around a single timer callback
- Usually you want to be issuing commands at a single fixed frequency
- If you have a timer running at
X hz
, it's easy to subdivide that intoX/N hz
- The timer should advance the progress of your algorithm by one time-step
Service Clients
Blocking Calls
- Blocking calls are the easiest to handle and are preferred unless performance requirements dictate otherwise
- Assuming that you want to call the service from
callback_handler
- Make
callback_handler
anasync
function. - Put
callback_handler
in a callback group that is different than that of the service clients- They can also all be in the same
ReentrantCallbackGroup
- In general, it is better to have as many callbacks in the same
MutuallyExclusiveCallbackGroup
as possible, as this is the most restrictive (hence safest), but most issues will not arise in a blocking, single-threaded context
- They can also all be in the same
- Call the services using
await client.call_async()
This walkthrough provides a guide by walking through three examples related to event loops and asynchronicity. They are paired with specific nodes implemented in [[
- See the "await" example of https://github.com/m-elwin/async_experiments
- The example uses a
ReentrantCallbackGroup
for everything, but also provides an explanation about how to useMutuallyExclusiveCallbackGroups
Non-blocking
- For non-blocking service calls you need to implement logic that periodically checks the result of the future returned by
call_async
- The logic that checks the result should be in a timer that checks the
.done()
status of the future.- It is crucial that the future is only checked one time per callback. It cannot change in the middle of the callback in a single-threaded program.
- Callback groups do not need to be changed because every time the timer callback returns another callback can be executed.
- In this pattern, the work is being divided up (by you) into short spurts that can be executed within a timer period
- See the FutureClient example in https://github.com/m-elwin/async_experiments.
Walkthrough
These walkthroughs take you through the code in https://github.com/m-elwin/async_experiments in the order in which it is executed and provide a conceptual guide to what is happening.
Deadlock
- The deadlock node starts in the
async_client.py:deadlock_entry
function. - The
DeadlockClient
constructor (__init__
) is called:- It creates a service client
- It creates a timer
rclpy.spin
is called, which implements an event loop as follows in pseudocode:while Node_Has_Not_Exited: Check for an event If an event has occured, call the appropriate callback
- After some time a timer event occurs and the timer callback (
DeadlockClient.timer_callback
) is called from the event loop. - The timer callback calls the "delay" service using
call_async
and retrieves aFuture
object calledfuture
- This call sends a request to the node running the "delay" service, so that it may generate a response
- The future keeps track of the status of the response
- Initially, the response has not been received, so
future.done()
returns False
- The timer enters a loop that waits for
future.done()
to return true - Meanwhile, the node implementing the "delay" service does some processing, and sends a response
- The
DeadlockClient
node is still looping, waiting forfuture.done()
to be true- It will loop forever, because this loop does not call anything that checks for responses that can change the status of the
Future()
object sofuture.done()
will always beFalse
.
- It will loop forever, because this loop does not call anything that checks for responses that can change the status of the
Await
- The await node starts in the
async_client.py:await_entry
function. - The
AwaitClient
constructor (__init__
) is called:- It creates a service client called
_client
- It creates a timer
- Callback groups are setup so that the service client callback and timer callback can both be on the call stack at the same time
- It creates a service client called
rclpy.spin
is called, which implements an event loop as follows in pseudocode:while Node_Has_Not_Exited: Check for an event If an event has occured, call the appropriate callback
- After some time a timer event occurs and the timer callback (
AwaitClient.timer_callback
) is called from the event loop. - The timer callback calls the "delay" service using
call_async
and retrieves aFuture
object- We will call this object
future
even though we don't name it as we are awaiting directly on the return value _client
also has a reference tofuture
(since it createdfuture
it maintains its own reference to it)- This call sends a request to the node running the "delay" service, so that it may generate a response
- The future keeps track of the status of the response
- This callback calls
await
on the future object - Initially, the response has not been received, so
future.done()
returns False
- We will call this object
- Meanwhile, after receiving the request, the node offering the "delay" service begins creating a response
Because the
timer_callback
usedawait
, it returns control to the event loop. So theawait node is now back in =rclpy.spin
while Node_Has_Not_Exited: Check for an event If an event has occured, call the appropriate callback
- Meanwhile, the node offering the "delay" service finishes its computation and sends it's response
- Event loop receives this event and calls the default callback associated with
_client
, providing the response data
- Event loop receives this event and calls the default callback associated with
- The
_client.callback
does the following:- Store the response in the
future
- Signal that
future
is done
- Store the response in the
- The event loop is re-entered.
- Python is now able to resume executing the co-routine where
future
wasawaited
(thetimer_callback
)
- Python is now able to resume executing the co-routine where
- The timer callback resumes execution from after the
await
statement- Because the
future
isdone()
it knows the service call has completed - It finishes and the event loop
rclpy.spin()
is re-entered
- Because the
Concurrency
Execution Models
- In sequential computing, only one computation happens at a given time. The computations happen in sequence.
- When using a sequential model, callback groups don't matter (unless you explicitly
spin_once
from within a callback).- Unless
spin_once
is called, nothing attempts to call the callbacks - Each callback will run until completion.
- Unless
- When using a sequential model, callback groups don't matter (unless you explicitly
- In parallel computing, multiple computations can happen simultaneously.
- Callback groups matter, and there are other synchronization issues related to multi-threading
- In concurrent computing, multiple computations can be pending at the same time, but are not executing sequentially.
- For example, consider the following chess-playing scenarios
- A person playing one chess game, followed by another chess game: this is like sequential computation.
- Two pairs of people playing two chess games simultaneously: this is like parallel computation.
- One person playing two chess games against two opponents. First the person makes a move in game 1, then makes a move in game 2. Both games are in progress at the same time, but the person is not making a move in each game simultaneously.
Process
- A process is an abstraction used by the Linux Kernel (and other operating systems) to segregate memory address space.
- Code running in each process thinks it has access to the full memory-address space: in reality the kernel maps a processes memory into the physical ram
- Code running in each process cannot access memory used by other processes without operating-system intervention, increasing stability and security.
- Programmers do not need to worry about the intricacies of physical memory: to them memory is just a flat contiguous address space
- Every process has a "main" thread, which executes the machine code within the context of the process's memory address space
Thread
- A thread is subordinate unit of execution within a process, and it is what actually executes the machine code
- By default each process has a single "main" thread. But it can also create additional threads to run code simultaneously.
- All threads within a process share the same memory address space
- Each thread has its own stack (for calling functions and storing local variables)
- The heap and global variables are shared by threads
- Unlike processes, threads can directly read and write the memory used by another thread
- Each thread is scheduled for execution by the kernel
- If the machine has multiple CPUs then threads can be executed in parallel
- If there are more threads than CPUs (as is usually the case), the kernel rapidly switches between threads, executing little bits of code each time
- In this case the code is running concurrently, but it is fast enough to simulate simultaneity
- If two threads access the same memory and are not coordinated properly, a bug called a race condition can occur.
- A race condition can result in data corruption
- It can also result in a deadlock, where no thread can continue executing
- The exact interleaving of the instructions across multiple threads is non-deterministic because it depends on how the kernel
schedules execution, which in turn depends on what else is happening on the computer
- Non-deterministic bugs in multi-threaded code may be from a race condition
- Be cautious and have a plan before introducing extra threads into your program
- - As the operating system switches between threads, it is possible for instructions to be interrupted before completing:
- Generally, a single python statement maps to multiple machine instructions and thus a statement can be interrupted in the middle of executing
- Consider
i = i + 1
. The thread can be interrupted in the middle of reading the value ofi
, adding1
to it, or storing the result ini
- In the meantime, another thread can modify
i
, leading to incorrect results.
- Atomic operations finish executing without interruption. Atomic operations guarantee that a complete result will be computed and seen by all threads.
- Synchronization primitives such as a mutex or a semaphore can be used to coordinate execution between threads
Threads in Python
- The most common python interpreter, CPython, implements a Global Interpreter Lock (GIL)
- The GIL simplifies the creation of the interpreter while preserving good single-threaded performance
- It also severely limits multi-threaded performance in python.
- The GIL prevents multiple python instructions from executing simultaneously on a multi-core system.
- Thus, multi-threading with CPython is like multi-threading on a single-core CPU: the python code is not executed simultaneously
- Operations on each thread are interleaved (that is, the python interpreter executes some commands on one thread, then switches to another).
- The order of this interleaving is generally non-deterministic
- The GIL does not prevent all race-conditions
- You still need to synchronize threads that read/write to the same shared variables.
- Bugs that occur due to specific orderings of operations on multiple threads can still occur
- Non-atomic python statements can be interrupted before completion
- The GIL only applies to python bytecode instructions
- Python code can call C code, and that C code can bypass the GIL
- Python waits for a system function (e.g., to reading from a file), it is not executing bytecode. Thus, another thread can run while the other thread waits for the system.
- Thus, multiple-threads in python can improve performance of input/output (I/O) bound (but not CPU bound) python programs but not
- CPU bound means performance is limited by the available CPU resources
- I/O bound means performance is limited by input/output operations (such as reading a file)
- Atomic Operations: The following operations in python are guaranteed to complete once started, prior to another thread being run
- Reading or writing a single variable of a basic type (int, float, string, etc)
- Assigning an object to a variable (e.g., x = y)
- Reading an item from a list
- Modifying an item in a list
- Getting an item from a dictionary
- A complete list of python atomic operations
Example
If you perform a non-atomic operation, the thread you are running on can be interrupted in the middle.
Assume we have two threads, both performing i = i + 1
, a non-atomic operation. Thus what is actually executed gets broken into several interruptable steps
i = 0 # Two threads are started Thread 1 | Thread 2 i = i + 1 | i = i + 1
Operations can happen in multiple ways leading to different results: for example
Thread 1, reads i, it is 0 Thread 2, reads i, it is 0 Thread 1 adds 1 to what it read, yielding 1 Thread 1 stores 1 in i Thread 1 reads i it is 1 Thread 1 adds 1 to what it read, yielding 2 Thread 2 adds 1 to what it read, yielding 1 Thread 1 stores 2 in i Thread 2 stores 1 in i