How async works in Python - from iterators to asyncio - part 1
Background and introduction
Like a lot of other programmers, I’m often unable to just accept the abstraction. After all, if you don’t mine your own silicon, you don’t really understand what the computer is doing, and you’re not a real programmer. So when I wanted to learn more about async Python, I didn’t want to just learn how to use the asyncio library and its API. I wanted to find out how this stuff really works under the hood.
During my research, I was fortunate to discover the article “How the heck does async/await work in Python 3.5” by Brett Cannon. Brett is a core Python developer - few people know more about Python than him, and the article is very good, and you should read it. But because I’m not as knowledgeable about Python as Brett, there are quite a few things from the article that I needed to do more research on. As I was researching, I started making some notes, and eventually decided that I might want to publish them as my own article. Unfortunately, it’s gotten a bit out of control, so it will be quite long. This is part 1.
Generators, iterators and iterables
When async was first implemented in Python, it utilized a seemingly unrelated feature of the language - generators. Generators are functions which can pause their execution, and remember the state where they left off before pausing. They tend to be associated with sequences, one of their most famous uses is producing possibly infinite sequences lazily, without using a lot of memory. This makes sense - producing a sequence requires to know the current element (remembering state) and doing the steps for producing the next element, which can be the body of the function itself.
Typical examples include something like this:
def produce_evens(max: int):
i = 0
while i < max:
yield i
i += 2
This is actually really similar to what an iterator does. In Python, an iterator is an object which enables iteration using a for loop, by implementing the __next__ method. A for loop, under the hood, keeps calling the __next__ method of the iterator, until it reaches the end of the sequence, at which point it raises a StopIteration exception. Let’s look at a very simple iterator:
class CountToThree:
def __init__(self):
self.n = 0
def __iter__(self):
return self
def __next__(self):
self.n += 1
if self.n > 3:
raise StopIteration
return self.n
def play_with_custom_iterator():
to_three_iterator = CountToThree()
print("Iterating over custom to_three iterator")
for i in to_three_iterator:
print(i)
Running the above example will print the numbers 1, 2, and 3, and then the program ends. There is no indication of the fact that a StopIteration exception was raised, the for construct swallows it.
A related concept to an iterator is an iterable. An iterable is an object which is able to produce an iterator, which we can then use to iterate over that object. This includes objects such as list, dict or tuple. To express the difference more clearly, an iterable is “something you can loop over”, while an iterator is “something that remembers where it is”. When we are iterating over a list in Python, we are not actually iterating over the list itself. More precisely, the list produces an iterator, which we can then use to iterate over the list:
def play_with_iterators():
my_list = [1, 2, 3, 4, 5]
print("Iterating over my_list a first time")
for i in my_list:
print(i)
print("Iterating over my_list a second time")
for i in my_list:
print(i)
my_iterator = iter(my_list)
print("Iterating over my_iterator a first time")
for i in my_iterator:
print(i)
print("Iterating over my_iterator a second time")
# Unlike the above, this won't run at all a second time - we've exhausted the iterator
for i in my_iterator:
print(i)
another_iterator = iter(my_list)
yet_another_iterator = iter(my_list)
print("Some values from alternatively calling two instances of iterators over my_list")
print(next(another_iterator))
print(next(another_iterator))
print(next(yet_another_iterator))
print(next(yet_another_iterator))
print(next(another_iterator))
print(next(yet_another_iterator))
print(next(another_iterator))
print(next(another_iterator))
try:
print(next(another_iterator))
except StopIteration:
print("StopIteration occurred while calling `next()` on `another_iterator`")
This code should also be a good illustration of why calling the __iter__() method on an iterable returns an iterator, and calling it on an iterator returns the iterator itself. This way, we can use for with both iterators and iterables. So if you were confused by the implementation of the __iter__ method on the CountToThree class, hopefully it’s a bit clearer now.
As we can see, iterators and generators are very closely related. Both of them are able to provide values when we ask for them, either using a for loop, or by calling next() on them ‘manually’ (which is what a for loop is doing under the hood). In order to do that, they both have to be able to maintain state in between calls. Iterators do it just by storing some values as properties on themselves. In generators, it is not as explicit, there is seemingly some ‘magic’ as to where they store their state when they are suspended. We will of course spoil the magic trick later, by examining how exactly the state is stored.
Ranges
The range() function would seem to be another close relative of generators. It is lazy (not all values are generated immediately) and we can iterate over it.
def lazy_vs_eager():
eager_list = [i for i in range(1_000_000)]
print(f"eager_list has length {len(eager_list)}, type {type(eager_list)} and memory size {sys.getsizeof(eager_list)}")
lazy_range = range(1_000_000)
print(f"lazy_range has length {len(lazy_range)}, type {type(lazy_range)} and memory size {sys.getsizeof(lazy_range)}")
lazy_generator = (i for i in range(1_000_000))
# Can't call len() on generators
print(f"lazy_generator has length (don't know), type {type(lazy_generator)} and memory size {sys.getsizeof(lazy_generator)}")
Interestingly enough, the range() function is actually not implemented as a generator. It has a dedicated type, range. This allows range to offer some features which generators lack, such as indexing, slicing and being able to iterate multiple times. On the other hand, range only allows us to create linear sequences, since start, stop and particularly step must be integer values. You cannot express the idea of a variable step, and therefore you cannot express the idea of a non-linear sequence.
def explore_range_vs_generator() -> None:
my_range = range(5)
print(f"my_range is {my_range} of type {type(my_range)}")
print(f"range itself is {range} of type {type(range)}")
print(f"is my_range an instance of generator? {isinstance(my_range, types.GeneratorType)}")
print("Iterating over my_range a first time")
for i in my_range:
print(i)
print("Iterating over my_range a second time")
for i in my_range:
print(i)
print(f"range[3] is {my_range[3]}")
print(f"produce_evens is {produce_evens} of type {type(produce_evens)}")
my_produce_evens = produce_evens(20)
print(f"my_produce_evens is {my_produce_evens} of type {type(my_produce_evens)}")
print(f"is my_produce_evens an instance of generator? {isinstance(my_produce_evens, types.GeneratorType)}")
print(f"Iterating over my_produce_evens a first time")
for i in my_produce_evens:
print(i)
print(f"Iterating over my_produce_evens a second time")
try:
for i in my_produce_evens:
print(i)
except Exception as ex:
print("An exception occurred while iterating a second time: ", ex)
try:
third_element = produce_evens(10)[3]
print(f"produce_evens(10)[3] is {third_element}")
except Exception as ex:
print("An exception occurred while trying to index into the produce_evens: ", ex)
fresh_range = range(5)
print("Calling next() on a range")
try:
print(next(fresh_range))
except Exception as ex:
print(f"An exception occurred while calling next() on a range: ", ex)
fresh_generator = produce_evens(21)
print("Calling next() on a generator")
try:
print(next(fresh_generator))
except Exception as ex:
print(f"An exception occurred while calling next() on a range: ", ex)
So we can see that while ranges have some similarities to generators - they are lazy and we can iterate over them. However, there are also differences - generators are actual iterators we can call next() on, while ranges support random access via indexation.
More on generators
Sending values in
So far, it seems like generators and iterators are basically the same. Everything we have done so far with a generator, we could do with an iterator as well. But that is not actually the case. Generators offer more features. I think that the main difference is that with generators, we can actually send values into the generator as well:
def sequence(max: int, debug: bool = False) -> Generator[int, int | None, None]:
i = 0
while i < max:
if debug:
print(f"About to yield {i}")
incoming_msg = yield i
increment = 2
if incoming_msg is None:
if debug:
print("No one sent us anything :(")
else:
if isinstance(incoming_msg, int):
if debug:
print(f"Someone sent us a desired increment, let's increment by {incoming_msg}")
increment = incoming_msg
else:
if debug:
print(f"Someone sent us a non-numeric increment {incoming_msg}, let's increment by our default of 2 instead")
if debug:
print(f"After yielding, incoming message is {incoming_msg}")
i += increment
def play_with_sequence() -> None:
my_gen = sequence(20)
print(next(my_gen))
print(next(my_gen))
print(next(my_gen))
result = my_gen.send(4)
print(f"result after sending 4 is {result}")
result = my_gen.send(8)
print(f"result after sending 8 is {result}")
print(next(my_gen))
# This would raise StopIteration if uncommented
# print(next(my_gen))
Initially, we had only defined generators using the yield statement. This statement allowed us to pause our generator, and to return some value to the next() call. In the code snippet just above, we defined a generator using a yield expression. The difference is that statements don’t actually return a value, they do not evaluate to anything. They can not be put on the right side of an assignment.
def foo():
# These two RHS are expressions, they can be on the right side of an assignment
a = 4 + 2
b = bar(a)
# These two RHS are statements, they can NOT be on the right side of an assignment, so these lines don't make any sense
c = if
d = return
In the sequence generator in the snippet above, we are using a yield expression. We are putting yield on the right hand side of an assignment. What does it evaluate to? It evaluates to whatever is sent using the .send() method. And what is returned as the result of the .send() method? The value that is yielded from the generator. In fact, functionally, .send(None) and next() are the same. Calling next() gets the next value from the generator, while sending in None. And .send(None) also retrieves the next value from the generator, while sending in None.
Priming the generator
However, before we start sending actual values in, we first have to ‘prime the generator’. To ‘prime the generator’ means that we first have to get at least one value from the generator, before starting to send-in actual (non-None) values. If we try something like this:
def yet_another_generator(high: int):
print("yet_another_generator started")
i = 0
while i < high:
received_msg = yield i
if received_msg is not None:
i += received_msg
i += i + 1
def yet_another_consumer():
gen = yet_another_generator(100)
print(f"gen is {gen}, starting it")
print(gen.send(10))
We get the following error: TypeError: can't send non-None value to a just-started generator. The reason for this is that a freshly created generator hasn’t executed any of its body yet. This is obvious from the fact that the statement “yet_another_generator started” never prints in the above code. And because none of the body of the generator has executed yet, Python doesn’t yet have a yield expression ready to receive the value that we’re sending in. If you read the error message closely, you will notice that what I wrote earlier still holds though - next() is equivalent to .send(None). The error message is complaining about a non-None value specifically. If we changed the above code to send in None, rather than 10, it would execute fine.
Returning values
So far we have seen generators return values to the caller using the yield keyword in the generator, and using iteration in the callee. However, generators can even include return statements, using which they can return a final value after there are no more yield statements. Let’s have a look at an example:
def generator_with_return() -> Generator[int, None, int]:
i = 0
while i < 11:
yield i
i += 2
return 420
def returning_from_generators():
gen = generator_with_return()
print(f"gen is {gen} of type {type(gen)}")
print(f"Getting values from generator using for loop")
for x in gen:
print(x)
gen = generator_with_return()
print(f"Getting values from generator by calling next()")
while True:
try:
print(next(gen))
except StopIteration as ex:
print(f"Got StopIteration with a value of {ex.value}")
break
In this example, we are first iterating over the generator using a for loop, and then ‘manually’ by calling next() in a while loop. When iterating using the for loop, we only get the values which are yielded by the generator - multiples of 2, starting at 0, up to 10. But we never get the value 420, returned using the return statement. After that, when we iterate by calling next(), we get that same sequence. But after that, the message “Got StopIteration with a value of 420” is printed. As seen in the code, the value returned using the return statement can only be obtained by manually exhausting the generator, thereby causing a StopIteration exception, and examining its .value property. We did not get this value when iterating using the for loop, because the for loop internally catches the StopIteration exception, and lets it pass silently. In fact, if we added a print statement like this:
def generator_with_return() -> Generator[int, None, int]:
i = 0
while i < 11:
yield i
i += 2
print("about to return 420")
return 420
We would see that that message gets printed in both cases - when iterating using the for loop and when iterating using the while loop. It should now be intuitive, from this example, and from the play_with_iterators example, that a for loop in Python actually does something like this under the hood:
# equivalent of `for value in iterator`
iterator = iter(iterable)
while True:
try:
value = next(iterator)
except StopIteration:
break
yield from
So far, we have seen generators only use the yield statement and expression. But there is actually another statement and expression which can be used to define generators - yield from. The simplest feature that yield from provides is delegating control to another sub-generator or iterable. It allows us to easily chain generators together:
def sub_generator():
"""Generate Fibonacci numbers"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
def dom_generator():
"""Example of what happens when we 'yield from' another generator"""
yield from sub_generator()
def dumb_generator():
"""What would happen if we only 'yield' rather than 'yield from' another generator"""
yield sub_generator()
def manual_dom_generator():
"""A simplified model of what 'yield from' does"""
for x in sub_generator():
yield x
def sub_and_dom_generator_exploration():
counter = 0
upper = 6
print("Generating from sub_generator")
for i in sub_generator():
print(i)
if counter > upper:
break
counter += 1
counter = 0
print("Generating from dom_generator")
for i in dom_generator():
print(i)
if counter > upper:
break
counter += 1
counter = 0
print("Generating from dumb_generator")
for i in dumb_generator():
print(i)
if counter > upper:
break
counter += 1
counter = 0
print("Generating from manual_dom_generator")
for i in manual_dom_generator():
print(i)
if counter > upper:
break
counter += 1
As you can see in this example, if we just yield a generator from another generator, it yields the generator itself, not its values. If we use yield from, then the values themselves are yielded. In this context, we see that yield from sub_generator() is equivalent to for x in sub_generator(): yield x. However, there are other cases where yield from gives us features that can’t be replicated with just a for loop and yield statement. Consider the following example, where we engage in bi-directional communication with sub-generators:
def bi_sub_generator(top: int):
i = 0
while i < top:
increment = 2
received_msg = yield i
if received_msg is not None:
increment = received_msg
i += increment
return 326
def bi_dom_generator(top: int):
"""Again, we 'yield from' another generator"""
final_result = yield from bi_sub_generator(top)
print(f"bi_dom_generator - final_result is {final_result}")
return final_result
def bi_dom_manual_generator(top: int):
"""This is actually the whole machinery we need to mimic the power of 'yield from'"""
sub_gen = bi_sub_generator(top)
try:
value = next(sub_gen)
while True:
try:
sent = yield value
value = sub_gen.send(sent)
except StopIteration as e:
final_result = e.value
print(f"bi_dom_manual_generator - final_result is {final_result}")
return final_result
finally:
sub_gen.close()
def bi_manager(dom_generator):
"""
Helper driver function, for demonstration
We will call this with bi_dom_generator and then with bi_dom_manual_generator
"""
print(next(dom_generator))
print(next(dom_generator))
# These values actually get sent all the way to the sub generator
print(dom_generator.send(5))
print(dom_generator.send(10))
print(dom_generator.send(15))
# we want to exhaust it, to get the final return value, so we must use a while loop, and catch the StopIteration exception.
print("iterating with while loop")
while True:
try:
print(next(dom_generator))
except StopIteration as e:
print(f"Got final return value from dom_generator: {e.value}")
return e.value
def manual_vs_yield_from_bi_manager():
print("Running with auto generator")
auto_generator = bi_dom_generator(50)
final_result = bi_manager(auto_generator)
print(f"Got final result from bi_manager using auto generator {final_result}")
print("Running with manual generator")
manual_generator = bi_dom_manual_generator(50)
final_result = bi_manager(manual_generator)
print(f"Got final result from bi_manager using manual generator {final_result}")
In this example, we are demonstrating all the extra features that yield from gives us, compared to yield. First of all, we can send values from the bi_manager, through the bi_dom_generator all the way to the bi_sub_generator. This really seemed quite magical to me the first time that I encountered it. Secondly, we can get the return value of bi_sub_generator without doing the tedious manual exhaustion and exception-handling. Thirdly, the yield from statement ensures that the .close() method gets called on the sub-generator.
So how does this relate to async again?
We’re getting there, slowly but surely. I promise (pun intended).
If we think back to the fundamental properties of generators - being able to pause, keep state, and send data back in - you might be starting to realize how this could be useful in the context of asynchronous programming. Being able to pause a function (while it waits for an IO operation to complete), and then resume it, with all of its state stored, is the key for unlocking asynchronous programming.
We also need one other construct - the event loop. The event loop manages what happens when a function suspends. It allows for the running of other functions in the meantime. It also passes data back into suspended coroutines. The event loop is like a manager of coroutines.
Let’s look at an example of how we could implement asynchronous programming using generators and our own event loop. After, we will take a look at the actual Python implementation, and its evolution over time.
This example is very much based on the example in Brett’s article, with very minor modifications.
import datetime
import heapq
import types
import time
class Task:
"""Represent how long a coroutine should wait before starting again.
Comparison operators are implemented for use by heapq. Two-item
tuples unfortunately don't work because when the datetime.datetime
instances are equal, comparison falls to the coroutine and they don't
implement comparison methods, triggering an exception.
Think of this as being like asyncio.Task/curio.Task.
"""
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
"""An event loop focused on delaying execution of coroutines.
Think of this as being like asyncio.BaseEventLoop/curio.Kernel.
"""
def __init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
# Start all the coroutines.
for coro in self._new:
wait_until = coro.send(None)
heapq.heappush(self._waiting, Task(wait_until, coro))
# Keep running until there is no more work to do.
while self._waiting:
now = datetime.datetime.now()
# Get the coroutine with the soonest resumption time.
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
# We're ahead of schedule; wait until it's time to resume.
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
# It's time to resume the coroutine.
wait_until = task.coro.send(now)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
# The coroutine is done.
pass
def sleep(seconds):
"""Pause a coroutine for the specified number of seconds.
Think of this as being like asyncio.sleep()/curio.sleep().
"""
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
# Make all coroutines on the call stack pause; the need to use `yield`
# necessitates this be generator-based and not an async-based coroutine.
actual = yield wait_until
# Resume the execution stack, sending back how long we actually waited.
return actual - now
def countdown(label, length, *, delay=0):
"""Countdown a launch for `length` seconds, waiting `delay` seconds.
This is what a user would typically write.
"""
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = yield from sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = yield from sleep(1)
length -= 1
print(label, 'lift-off!')
def main():
"""Start the event loop, counting down 3 separate launches.
This is what a user would typically write.
"""
loop = SleepingLoop(
countdown('A', 5),
countdown('B', 3, delay=2),
countdown('C', 4, delay=1),
)
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
If you’re struggling to understand this code, as I did the first time I looked at it, let me give a couple of pointers (pun intended):
- the only part of the code that’s driving the generators is the
run_until_completemethod. It is making.send()calls, which advance the generator. - the only value that is yielded in response to the
.send()calls is thewait_untilvalue from thesleepfunction / generator. That is the only part of the code that actually contains ayieldexpression. - notice that in the first part of the
run_until_completefunction (the for loop), we are priming the generators, so that we can then send non-None values into them, as explained earlier