Message queue abstraction for Python?

09 Sep.,2023

 

The most neutral way to handle a message queue is to just use a database to store your messages. If your app get's more complex you can switch to something more sophisticated.

Right now I'm using PeeWee ORM to handle the messages. During testing the database is set to use an SQLite instance. In production the postgres server is set. You could just use the sqlite if the app doesn't use a lot of write concurrency.

from enum import Enum
from peewee import (  # type: ignore
    Model,
    CharField,
    DateTimeField,
    ForeignKeyField,
    Proxy,
    UUIDField,
    ModelSelect,
)
from peewee_extra_fields import EnumField  # type: ignore

PRODUCTION_DB_URL = "postgres://test_db_z9du_user:b5TWChOlCpKQRHcAg8u64kUpVIjNba5O@dpg-cc8ltahgp3jhesr2u9u0-a.oregon-postgres.render.com/test_db_z9du"  # pylint: disable=line-too-long
TEST_DB_PATH = str(HERE.parent / "data/test.db")
SQLITE_URL = f"sqlite:////{TEST_DB_PATH}".replace("\\", "/")
DB_URL = SQLITE_URL if IS_TESTING else PRODUCTION_DB_URL
if IS_TESTING:
    db_dir = os.path.dirname(TEST_DB_PATH)
    if not os.path.exists(db_dir):
        print(f"Creating test db directory {db_dir}")
        os.makedirs(db_dir, exist_ok=True)
    os.makedirs(db_dir, exist_ok=True)


database_proxy = Proxy()
database_proxy.initialize(connect(DB_URL))


class BaseModel(Model):
    """A base model that will use our Sqlite database."""

    class Meta:  # pylint: disable=too-few-public-methods
        """Meta class."""

        database = database_proxy

database_proxy = Proxy()
database_proxy.initialize(connect(DB_URL))

class MessageStatusEnum(Enum):
    """Enum class for message status"""

    QUEUED = 1
    PROCESSING = 2
    ERROR = 3


class Message(BaseModel):
    """Message model."""

    user = ForeignKeyField(User, backref="messages", primary_key=True)
    type = CharField(index=True)
    message = CharField(null=True)
    last_error_msg = CharField(null=True)
    created = DateTimeField(index=True, default=datetime.now)
    status = EnumField(
        index=True, enum=MessageStatusEnum, default=MessageStatusEnum.QUEUED
    )  # MessageStatusEnum.QUEUED or MessageStatusEnum.PROCESSING

Example of how to use message queue:

def pop_next_email_drm_task_for_processing(self) -> Message | None:
    """Pop the next email drm task."""
    try:
        with database_proxy.atomic():
            # get the first one
            cursor = (
                Message.select()
                .where(
                    Message.status == MessageStatusEnum.QUEUED and Message.type == "email_drm"
                )
                .order_by(Message.created)
                .limit(1)
            )
            message = cursor.first()
            if message is None:
                return None
            message.status = MessageStatusEnum.PROCESSING
            message.save()
            return message

For more information best message queue, please get in touch with us!