29 lines
801 B
Python
29 lines
801 B
Python
from queue import Queue
|
|
from typing import Any, NamedTuple, final
|
|
|
|
|
|
@final
|
|
class QueueMessage(NamedTuple):
|
|
sender: str
|
|
receiver: str
|
|
message: Any
|
|
variable: str | None = None
|
|
|
|
|
|
@final
|
|
class SnapshotQueue(Queue[QueueMessage]):
|
|
def snapshot(self) -> list[QueueMessage]:
|
|
with self.mutex:
|
|
return list(self.queue)
|
|
|
|
def get_all_for_receiver(self, receiver: str) -> list[QueueMessage]:
|
|
res: list[QueueMessage] = []
|
|
with self.mutex:
|
|
entries = [entry for entry in self.queue if entry.receiver == receiver]
|
|
for entry in entries:
|
|
index = self.queue.index(entry)
|
|
self.queue.rotate(-index)
|
|
res.append(self.queue.popleft())
|
|
self.queue.rotate(index)
|
|
return res
|