queues are first in, first out (FIFO) abstract structures (i.e., items are removed in the same order they are added) that can be implemented with two arrays or a dynamic array (linked list), as long as items are added and removed from opposite sides.
queues support enqueue (add to one end) and dequeue (remove from the other end, or tail).
if implemented with a dynamic array, a more efficient solution is to use a circular queue (ring buffer), i.e., a fixed-size array and two pointers to indicate the starting and ending positions.
- an advantage of circular queues is that we can use the spaces in front of the queue.
- in a normal queue, once the queue becomes full, we cannot insert the next element even if there is a space in front of the queue. but using the circular queue, we can use the space to store new values.
queues are often used in breath-first search (where you store a list of nodes to be processed) or when implementing a cache.
in python, queues can be called with
(and methodspopleft()
- a circular queue can be built with either arrays or linked lists (nodes).
- to build a ring with a fixed size array, any of the elements could be considered as the head.
class CircularQueue:
def __init__(self, size):
self.head = 0
self.tail = 0
self.size = size
self.queue = [None] * self.size
def enqueue(self, value: int) -> bool:
if self.is_full():
return False
if self.is_empty():
self.head = 0
while self.queue[self.tail] is not None:
self.tail += 1
if self.tail == self.size:
self.tail = 0
self.queue[self.tail] = value
return True
def dequeue(self) -> bool:
if self.is_empty():
return False
value = self.queue[self.head]
self.queue[self.head] = None
self.head += 1
if self.head == self.size:
self.head = 0
return True
def front(self) -> int:
return self.queue[self.head] or False
def rear(self) -> int:
return self.queue[self.tail] or False
def is_empty(self) -> bool:
for n in self.queue:
if n is not None:
return False
return True
def is_full(self) -> bool:
for n in self.queue:
if n is None:
return False
return True
- to enqueue, you loop the queue with the tail index until you find a
(even if it has to loop back):
while queue[self.tail]:
self.tail += 1
if self.tail == self.size:
self.tail = 0
self.queue[self.tail] = value
- to dequeue, you simply pop the head value:
value = self.queue[self.head]
self.queue[self.head] = None
self.head += 1
- as long as we know the length of the queue, we can instantly locate its tails based on this formula:
tail_index = (head_index + current_length - 1) % size
there is one occasion when
index is set to zero:- when the enqueue operation adds to the last position in the array and tail has to loop back (
self.tail == self.size
- when the enqueue operation adds to the last position in the array and tail has to loop back (
there are two occasions when
index is set to zero:- when the queue is checked as empty
- when the dequeue operation popped the last element in the array and head has to loop back (
self.head == self.size
- another version used only one index (for
) and calculate the tail with the equation:
(self.head + self.count - 1) % self.size
- and the next tail with:
(self.head + self.count) % self.size
- and the next
is always given by:
(self.head + 1) % self.size
- in the example below we also implement the methods
using an extra counter variableself.count
that can be compared toself.size
and used to validaterear
- in the example below we also implement the methods
class CircularQueue:
def __init__(self, size):
self.head = 0
self.count = 0
self.queue = [0] * size
self.size = size
def _get_tail(self):
return (self.head + self.count - 1) % self.size
def _get_next_tail(self):
return (self.head + self.count) % self.size
def _get_next_head(self):
return (self.head + 1) % self.size
def enqueue(self, value: int) -> bool:
if self.is_empty():
return False
self.queue[self._get_next_tail()] = value
self.count += 1
return True
def dequeue(self) -> bool:
if self.is_empty():
return False
self.head = self._get_next_head()
self.count -= 1
return True
def Front(self) -> int:
if self.is_empty():
return False
return self.queue[self.head]
def Rear(self) -> int:
if self.is_empty():
return False
return self.queue[self._get_tail()]
def isEmpty(self) -> bool:
return self.count == 0
def isFull(self) -> bool:
return self.count == self.size
a linked list could be more memory efficient, since it does not pre-allocate memory for unused capacity (and size of the queue is not "artificial" like before).
note that this queue is not thread-safe: the data structure could be corrupted in a multi-threaded environment (as race-condition could occur). to mitigate this problem, one could add the protection of a lock.
class Node:
def __init__(self, value, next=None):
self.value = value
self.next = next
class Queue:
def __init__(self, size):
self.size = size
self.count = 0
self.head = None
self.tail = None
def enqueue(self, value: int) -> bool:
if self.is_full():
return False
if self.is_empty():
self.head = self.tail = Node(value)
self.tail.next = Node(value)
self.tail = self.tail.next
self.count += 1
return True
def dequeue(self) -> bool:
if self.is_empty():
return False
self.head = self.head.next
self.count -= 1
return True
def front(self) -> int:
if self.is_empty():
return False
return self.head.value
def rear(self) -> int:
if self.is_empty():
return False
return self.tail.value
def is_empty(self) -> bool:
return self.count == 0
def is_full(self) -> bool:
return self.count == self.size
class Logger:
def __init__(self):
self.msg_set = set()
self.msg_queue = deque()
def print_message(self, timestamp, message) -> bool:
while self.msg_queue:
msg, msg_timestamp = self.msg_queue[0]
if timestamp - msg_timestamp >= 10:
if message not in self.msg_set:
self.msg_queue.append((message, timestamp))
return True
return False
- given a stream of integers and a window size, the moving average in the sliding window can be calculated with a queue:
class MovingAverage:
def __init__(self, size: int):
self.queue = []
self.size = size
def next(self, val: int) -> float:
if len(self.queue) > self.size:
return sum(self.queue) / len(self.queue)