The most neutral way to handle a message queue is to just use a database to store your messages. If your app get's more complex you can switch to something more sophisticated.
Right now I'm using PeeWee ORM to handle the messages. During testing the database is set to use an SQLite instance. In production the postgres server is set. You could just use the sqlite if the app doesn't use a lot of write concurrency.
from enum import Enum
from peewee import ( # type: ignore
Model,
CharField,
DateTimeField,
ForeignKeyField,
Proxy,
UUIDField,
ModelSelect,
)
from peewee_extra_fields import EnumField # type: ignore
PRODUCTION_DB_URL = "postgres://test_db_z9du_user:b5TWChOlCpKQRHcAg8u64kUpVIjNba5O@dpg-cc8ltahgp3jhesr2u9u0-a.oregon-postgres.render.com/test_db_z9du" # pylint: disable=line-too-long
TEST_DB_PATH = str(HERE.parent / "data/test.db")
SQLITE_URL = f"sqlite:////{TEST_DB_PATH}".replace("\\", "/")
DB_URL = SQLITE_URL if IS_TESTING else PRODUCTION_DB_URL
if IS_TESTING:
db_dir = os.path.dirname(TEST_DB_PATH)
if not os.path.exists(db_dir):
print(f"Creating test db directory {db_dir}")
os.makedirs(db_dir, exist_ok=True)
os.makedirs(db_dir, exist_ok=True)
database_proxy = Proxy()
database_proxy.initialize(connect(DB_URL))
class BaseModel(Model):
"""A base model that will use our Sqlite database."""
class Meta: # pylint: disable=too-few-public-methods
"""Meta class."""
database = database_proxy
database_proxy = Proxy()
database_proxy.initialize(connect(DB_URL))
class MessageStatusEnum(Enum):
"""Enum class for message status"""
QUEUED = 1
PROCESSING = 2
ERROR = 3
class Message(BaseModel):
"""Message model."""
user = ForeignKeyField(User, backref="messages", primary_key=True)
type = CharField(index=True)
message = CharField(null=True)
last_error_msg = CharField(null=True)
created = DateTimeField(index=True, default=datetime.now)
status = EnumField(
index=True, enum=MessageStatusEnum, default=MessageStatusEnum.QUEUED
) # MessageStatusEnum.QUEUED or MessageStatusEnum.PROCESSING
Example of how to use message queue:
def pop_next_email_drm_task_for_processing(self) -> Message | None:
"""Pop the next email drm task."""
try:
with database_proxy.atomic():
# get the first one
cursor = (
Message.select()
.where(
Message.status == MessageStatusEnum.QUEUED and Message.type == "email_drm"
)
.order_by(Message.created)
.limit(1)
)
message = cursor.first()
if message is None:
return None
message.status = MessageStatusEnum.PROCESSING
message.save()
return message
For more information best message queue, please get in touch with us!