Queues are a useful data structure in programming that allow you to add and remove elements in a first in, first out (FIFO) order. Python provides a built-in module called queue that implements different types of queue data structures. In this comprehensive guide, we will cover everything you need to know about using queues in Python.
What is a Queue?
A queue is a linear data structure that follows the FIFO principle — the first element added to the queue is the first one to be removed. This is analogous to a physical queue of people waiting in line — the first person in line is the first to be served.
The main operations of a queue are:
- Enqueue — Add an element to the end of the queue
- Dequeue — Remove an element from the front of the queue
- IsEmpty — Check if the queue is empty
- IsFull — Check if the queue is full
- Peek — Get the value of the front element without removing it
Queues maintain a logical order of elements and provide efficient insertion and deletion. They can be implemented using arrays, linked lists, stacks, or dequeues.
Some common uses of queues are:
- Job scheduling — Tasks get added to a queue and processed in order
- Print spoolers — Print jobs get spooled in a queue before being printed
- Keyboard buffer — Keystrokes get added to a queue before being processed
- Web server request queue — Requests get queued before being handled
- Queues provide first-in, first-out behavior that is very useful in programming for certain situations.
Queue Implementation in Python
Python provides a queue module in the standard library that has different queue implementations for programmers to use in their applications.
The key classes implemented in the queue module are:
- Queue — A FIFO queue
- LifoQueue — A LIFO queue or stack
- PriorityQueue — A priority queue that retrieves elements based on priority
- deque — A double-ended queue that allows adding/removing from both ends
Let’s explore how to use these classes to implement queues in Python:
Queue
Queue implements a basic FIFO queue. You can initialize a Queue instance like this:
from queue import Queue
q = Queue()
# The key methods available are:
qsize() # - Get the size of the queue
empty() # - Check if queue is empty
full() # - Check if queue is full
put(item) # - Put an item into the queue
get() # - Remove and return an item from the queue
join() # - Block until all tasks are processed
Here is an example of using Queue:
from queue import Queue
q = Queue()
q.put(1) # Add 1 to queue
q.put(2)
q.put(3)
print(q.qsize()) # Prints 3
print(q.get()) # Prints 1
print(q.get()) # Prints 2
Queue is thread and process safe. You can use it for inter-thread/process communication.
LifoQueue
LifoQueue implements a last in, first out (LIFO) queue or stack. It has methods like:
qsize()
empty()
full()
put(item)
get()
Here is an example of using LifoQueue as a stack:
from queue import LifoQueue
stack = LifoQueue()
stack.put(1)
stack.put(2)
stack.put(3)
print(stack.get()) # Prints 3
print(stack.get()) # Prints 2
PriorityQueue
A PriorityQueue retrieves entries sorted by priority level. Elements with the lowest priority are retrieved first.
The priority can be set by the optional priority argument to put() or using a tuple (priority_number, data).
Example:
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((3, 'C'))
pq.put((1, 'A'))
pq.put((2, 'B'))
print(pq.get()) # Prints ('A', 1)
print(pq.get()) # Prints ('B', 2)
print(pq.get()) # Prints ('C', 3)
deque
deque stands for “double-ended queue” and allows efficient append/pop from both ends. It implements these methods:
append(item) # - Add to right end
appendleft(item) # - Add to left end
pop() # - Remove from right end
popleft() # - Remove from left end
Example usage:
from collections import deque
dq = deque()
dq.append(1)
dq.append(2)
dq.appendleft(3)
print(dq) # deque([3, 1, 2])
dq.pop() # 2
dq.popleft() # 3
deque provides a thread-safe version called deque.deque that can be used in multi-threaded programs.
Implementing a Queue in Python
In addition to the built-in data structures, you can also implement a custom queue class in Python. Here is one way to implement a basic queue:
class Queue:
def **init**(self):
self.items = []
def size(self):
return len(self.items)
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
if self.size() == 0:
return None
return self.items.pop(0)
To use it:
q = Queue()
q.enqueue(1)
q.enqueue(2)
q.enqueue(3)
print(q.size()) # 3
print(q.dequeue()) # 1
print(q.dequeue()) # 2
This provides a simple queue implementation to enqueue and dequeue elements in Python.
Multiprocessing with Queue
The Queue class provided by the queue module can be used for safely exchanging objects between multiple threads or processes. This allows you to build multi-producer, multi-consumer queues in Python.
Here is an example of how to use a Queue for inter-process communication:
from multiprocessing import Process, Queue
def worker(q):
print(q.get()) # Get item from queue
q.put('Result') # Put result back
if __name__ == '__main__':
q = Queue()
q.put('Input')
p = Process(target=worker, args=(q,))
p.start()
p.join()
print(q.get()) # Print result
This shows how you can pass data between processes safely using a shared Queue. The queue handles the synchronization required between processes.
Queue Methods in Threading Module
The threading module also provides the following queue classes:
queue.Queue # - A thread-safe FIFO queue
queue.LifoQueue # - A thread-safe LIFO queue
queue.PriorityQueue # - A thread-safe priority queue
These have a similar interface to the queue module but are designed to be used in multi-threaded programs.
Example:
from threading import Thread
from queue import Queue
def worker(q):
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
q = Queue()
for i in range(10):
q.put(i)
thread = Thread(target=worker, args=(q,))
thread.start()
q.join() # Block until all tasks are done
This shows how Queue can be used to coordinate work between threads.
Use Cases for Queues in Python
Some examples of where queues are commonly used in Python:
- Asynchronous task queues — Use a queue to store tasks to be processed asynchronously by workers
- Job schedulers — A queue can store jobs to be executed by multiple threads/processes
- Smooth data streaming — Use a queue to buffer incoming data chunks for smooth processing
- Traffic shaping — Use a queue to control flow of outbound network traffic
- Task coordination for parallel processing — Use a queue to coordinate work between processes/threads
- Queues provide a handy architecture for producer-consumer problems where you want to distribute work and process it asynchronously.
A complete example
import threading
import concurrent.futures
import time
from queue import Queue
from typing import List, Any, Callable, Dict
import json
import asyncio
import random
from datetime import datetime, timezone
class MyQueue:
def __init__(self):
self.items = []
def size(self):
return len(self.items)
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
if self.size() == 0:
return None
return self.items.pop(0)
class JobProcessor:
def __init__(self) -> None:
self.shared_queue = Queue()
self.lock = threading.Lock()
self.shared_data: Dict[str, List[str]] = {
"topic-1": [], "topic-2": [], "topic-3": [], "topic-4": [], "topic-5": []
}
def execute_one_job(self, job: Any, func: Callable):
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(func, job)
# Wait for the future to complete
try:
result = future.result()
print(f"Job {job} result: {result}")
except Exception as e:
print(f"Job {job} generated an exception: {e}")
def execute_many_jobs(self, jobs: List[Any], func: Callable):
with concurrent.futures.ThreadPoolExecutor(max_workers=len(jobs)) as executor:
futures = [executor.submit(func, job) for job in jobs]
# Iterate over completed futures
for future, job in zip(concurrent.futures.as_completed(futures), jobs):
try:
# Get the result of the job
result = future.result()
print(f"Job {job} result: {result}")
except Exception as e:
print(f"Job {job} generated an exception: {e}")
def writter(seff, data: List[str]):
def chunks(lst, chunk_size):
"""Yield successive chunks of the given list."""
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]
chunk_size = 50
chunked_array = list(chunks(data, chunk_size))
# Print each chunk
for i, chunk in enumerate(chunked_array):
print(f"Processing chunk {i + 1}: {chunk} \n")
def edge_message_writer(self):
while True:
while not self.shared_queue.empty():
data_json = self.shared_queue.get()
data = json.loads(data_json)
self.lock.acquire()
self.shared_data[data["topic"]].append(data["event"])
self.lock.release()
print("Processing messages...\n")
self.write_to_db()
print("done processing. waiting ...\n")
time.sleep(5)
def write_to_db(self):
self.lock.acquire()
jobs = [self.shared_data[key] for key in self.shared_data.keys() if len(self.shared_data[key]) > 0]
# clean up processed messages
for key in self.shared_data.keys():
if len(self.shared_data[key]) > 0:
self.shared_data[key] = []
self.lock.release()
for job in jobs:
print(f"writting to db. Data: {job}")
async def handle_messages(self):
time.sleep(0.1)
print("done processing ............")
def get_timestamp(self):
current_utc_time = datetime.now(timezone.utc)
formatted_timestamp = current_utc_time.strftime("%Y-%m-%d %H:%M:%S.%f%z")
return formatted_timestamp[:len(formatted_timestamp)-2]
def run_in_thread(self, coro):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(coro)
loop.run_until_complete(future)
async def main(self):
while True:
id = random.randint(1,5)
data = {
"topic": f"topic-{id}",
"event": {
"timestamp": self.get_timestamp(),
"id": id,
"value": random.randint(0,10)
},
}
self.shared_queue.put(json.dumps(data))
time.sleep(0.1)
if __name__ == "__main__":
job = JobProcessor()
edge_thread = threading.Thread(target=job.edge_message_writer, args=())
cloud2device_thread = threading.Thread(target=job.run_in_thread, args=(job.handle_messages(),))
edge_thread.start()
cloud2device_thread.start()
asyncio.get_event_loop().run_until_complete(job.main())