StarfallBot/starfall/types.py

29 lines
801 B
Python

from queue import Queue
from typing import Any, NamedTuple, final
@final
class QueueMessage(NamedTuple):
sender: str
receiver: str
message: Any
variable: str | None = None
@final
class SnapshotQueue(Queue[QueueMessage]):
def snapshot(self) -> list[QueueMessage]:
with self.mutex:
return list(self.queue)
def get_all_for_receiver(self, receiver: str) -> list[QueueMessage]:
res: list[QueueMessage] = []
with self.mutex:
entries = [entry for entry in self.queue if entry.receiver == receiver]
for entry in entries:
index = self.queue.index(entry)
self.queue.rotate(-index)
res.append(self.queue.popleft())
self.queue.rotate(index)
return res