# Solving Producer/Consumer Problem of Concurrent Programming in Python

## What is the producer/consumer scenario?

Imagine you are at a fast-food chain, in which there are an order line and a serving line. The staff in the serving line prepare the meals while their coworkers in the order line take orders from customers. In this scenario, the staff in the order line are known as producers, who produce order chits, whereas their coworkers in the serving line are consumers, who consume order chits and prepare the meals. You may see producers and consumers as two distinct groups of entities, connected with the list of orders (or a better term in software engineering, a queue)

For human-being, coordinating these groups of workers (consumers and producers) seems relatively straightforward and intuitive. However, for computer sense, it’s unfortunately not the case.

## So, what’s the catch of the Producer/Consumer problem?

Let’s put our fast-food chain scenario into code:

``````"""
Implementation 1: Infinite Loop in Consumers
"""

import queue

orders = queue.Queue()

def serving_line_or_consumer():
while True:
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()  # Invoke this to indicate the "order" in the Queue is processed

def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")

# Let's put 4 staff into the order line

# Let's assign 6 staff into the serving line

# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]

# "join" the order, block until all orders are cleared
orders.join()

[t.join() for t in order_line]
[t.join() for t in serving_line]``````
Let’s focus on the consumer logic in

``serving_line_or_consumer()``

:

``````def serving_line_or_consumer():
while True:  # PROBLEM: Wait for orders
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()  # Invoke this to indicate the "order" in the Queue is processed
``````
The consumer is implemented with an infinite while loop. With an infinite loop, when there aren’t any orders to be prepared, the consumer thread will be busy waiting. If you’re unsure why busy waiting is an anti-pattern, it is because we are simply spinning in an infinite loop, waiting for an event to occur. While busy waiting seems simple and straightforward to implement, CPU cycles are wasted. A comprehensive explanation of busy waiting can be found here.

## What if we terminate consumers once they finished preparing all orders?

Let’s put it into code again:

``````"""
Implementation 2: Use a Sentinel Value to Stop Busy Waiting
"""

import queue

orders = queue.Queue()

def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()

if new_order is None:   # Check for Sentinel Value
has_order = False

# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()  # Invoke this to indicate the "order" in the Queue is processed

def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")

# Let's put 4 staff into the order line

# Let's assign 6 staff into the serving line

# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]

# ADDED THIS: Inform serving line no more orders
[orders.put(None) for _ in range(len(serving_line))]

# "join" the order, block until all orders are cleared
orders.join()

[t.join() for t in order_line]
[t.join() for t in serving_line]``````
In the second implementation, we use a sentinel value

``None``

to inform the consumers when there aren’t any new orders. We added a line to put

``None``

into

``orders``

:

``[orders.put(None) for _ in range(len(serving_line))]``

We then updated the consumer logic to regard the sentinel value:

``````def serving_line_or_consumer():
has_order = True
while has_order:
new_order = orders.get()

if new_order is None:   # Check for Sentinel Value
has_order = False

# prepare meals from `new_order`, assuming GIL is released while preparing meals
orders.task_done()  # Invoke this to indicate the "order" in the Queue is processed

``````
Okay, with the new logic, whenever the consumer encounters the sentinel value

``None``

, it will break from the infinite loop. However, the caveat is, we have to explicitly recreate a consumer whenever the system gets new orders after consumers are terminated. This isn’t really what desired in our
scenario. You wouldn’t want to dismiss the staff in the serving line when there aren’t any new orders.

## A Much Better Solution: Use a Semaphore

In a producer/consumer problem, it is best to use a concurrency control object: Lock, Mutex, Semaphore, etc. If you find these terms foreign, let just focus on semaphore for now.

What is a Semaphore?

A semaphore is a variable used to control access to a critical section in a multi-processing environment. In a multi-processing environment (a program that involves more than 1 thread/process), a critical section is a group of resources (variables, data, etc) shared among threads/processes.

In Python, Semaphore is a class with an internal counter, which value is always an integer, and at least 2 methods:

``acquire()``

, and

``release()``

. I will explain what

``acquire``

and

``release``

perform in the latter part.

In our fast-food chain scenario, the use of semaphore is not for controlling access to shared resources, but to allow consumers to sleep while waiting for an event. Let’s put semaphore into our scenario:

``````"""
Implementation 3: Use Semaphores
"""

import queue

orders = queue.Queue()

def serving_line_or_consumer():
while has_order.acquire():  # ADDED THIS: Acquire a Semaphore, or sleep until the counter of semaphore is larger than zero
new_order = orders.get()
# prepare meals from `new_order`, assuming GIL is released while preparing meals

def order_line_or_producer():
# Each staff in the serving line produces 200 orders
for _ in range(200):
orders.put("Order")
has_order.release() # ADDED THIS: Release the Semaphore, increment the internal counter by 1

# Let's put 4 staff into the order line

# Let's assign 6 staff into the serving line

# Put all staff to work
[t.start() for t in order_line]
[t.start() for t in serving_line]

# "join" the order, block until all orders are cleared
orders.join()

[t.join() for t in order_line]
[t.join() for t in serving_line]``````
In the latest implementation, the consumers wait to acquire a semaphore. The simplified logic of

``acquire()``

would be:

``````If the internal counter is zero:
allow the thread to sleep while waiting for it to be larger than zero
once the internal counter is more than zero, decrement it by 1
return True

if the internal counter is larger than zero:
decrement the internal counter by 1
return True``````
With

``acquire()``

, our consumers will be idle when there isn’t any order the internal counter of the semaphore is zero).

In our case, we updated our

``order_line_or_producer()``

to make use of semaphore as well. When a producer receives an order, it releases the semaphore. In short,

``release()``

doesn’t do anything but increment the internal counter of the semaphore.

With the semaphore being incremented,

``acquire()``

will then act accordingly. This is a much better solution among all. With semaphore, consumers will be put to idle/sleep while they wait for new orders, and producers inform consumers by incrementing the semaphore.

## Conclusion

In this article, we implemented a producer/consumer problem with 3 different implementations:

1. using an infinite loop to wait for events
2. using a sentinel value to terminate consumers when they are not needed temporarily
3. (the better solution) using a concurrency control object: a semaphore to allow consumers to sleep while waiting

Among these, a concurrency control object is much preferred. In this scenario, we used a semaphore. In general, there are other control objects than just semaphore. I highly recommend you explore on your own.

Anecdote: In

Private File Saver, I implemented a desktop client to sync local files to AWS S3 bucket in Python. The performance was satisfactory until I had to sync more than 12,000 files. It warranted a need to rewrite the logic to be more efficient. One of the obvious improvements would be implementing producer/consumer solution. (The details of this problem can be found here).