Skip to main content

Overview

The QueueManager class controls concurrent operations with per-user limiting. It prevents server overload and ensures fair resource allocation across users. Location: misc/queue_manager.py

Architecture

QueueManager is a singleton that tracks active requests per user. It uses asyncio locks for thread-safe operation and provides context managers for automatic resource cleanup.

Design Pattern

  • Singleton: One instance shared across the application
  • Context Manager: Automatic acquisition and release
  • Per-User Tracking: Independent limits per user/chat

Class Definition

class QueueManager:
    """Singleton queue manager for controlling concurrent operations."""

    _instance: QueueManager | None = None

    def __init__(self, max_user_queue: int):
        self.max_user_queue = max_user_queue
        self._user_info_counts: dict[int, int] = {}
        self._lock = asyncio.Lock()
Attributes:
  • max_user_queue - Maximum concurrent requests per user
  • _user_info_counts - Dict mapping user_id to active request count
  • _lock - Asyncio lock for thread-safe operations

Methods

get_instance()

Location: misc/queue_manager.py:54 Gets or creates the singleton instance.
@classmethod
def get_instance(cls) -> QueueManager:
    """Get or create the singleton instance."""
    if cls._instance is None:
        queue_config = config["queue"]
        cls._instance = cls(
            max_user_queue=queue_config["max_user_queue_size"],
        )
    return cls._instance
Returns: QueueManager singleton instance Example:
from misc.queue_manager import QueueManager

queue = QueueManager.get_instance()

reset_instance()

Location: misc/queue_manager.py:64 Resets the singleton instance (useful for testing).
@classmethod
def reset_instance(cls) -> None:
    """Reset the singleton instance (useful for testing)."""
    cls._instance = None

get_user_queue_count()

Location: misc/queue_manager.py:68 Gets current active request count for a user.
def get_user_queue_count(self, user_id: int) -> int:
    """Get current queue count for a user."""
    return self._user_info_counts.get(user_id, 0)
Parameters:
  • user_id - Telegram user/chat ID
Returns: Number of active requests for user Example:
queue = QueueManager.get_instance()
count = queue.get_user_queue_count(message.chat.id)
print(f"User has {count} active requests")

acquire_info_for_user()

Location: misc/queue_manager.py:72 Acquires a queue slot for a user.
async def acquire_info_for_user(
    self, user_id: int, bypass_user_limit: bool = False
) -> bool:
    """
    Acquire info slot for a user.

    Args:
        user_id: Telegram user/chat ID
        bypass_user_limit: If True, skip per-user limit check (for inline)

    Returns:
        True if acquired successfully, False if user limit exceeded
    """
    async with self._lock:
        if not bypass_user_limit and self.max_user_queue > 0:
            current_count = self._user_info_counts.get(user_id, 0)
            if current_count >= self.max_user_queue:
                logger.debug(
                    f"User {user_id} rejected: {current_count}/{self.max_user_queue} in queue"
                )
                return False

        # Increment user count
        self._user_info_counts[user_id] = self._user_info_counts.get(user_id, 0) + 1

    return True
Parameters:
  • user_id - Telegram user/chat ID
  • bypass_user_limit - If True, skip limit check (for inline downloads)
Returns: True if slot acquired, False if limit exceeded Example:
queue = QueueManager.get_instance()

# Normal acquisition (with limit check)
acquired = await queue.acquire_info_for_user(user_id)
if not acquired:
    await message.reply("Queue full, please wait...")
    return

# Bypass limit (for inline downloads)
acquired = await queue.acquire_info_for_user(user_id, bypass_user_limit=True)

release_info_for_user()

Location: misc/queue_manager.py:103 Releases a queue slot for a user.
async def release_info_for_user(self, user_id: int) -> None:
    """Release info slot for a user.

    This method is async to properly acquire the lock and prevent
    race conditions when multiple coroutines release concurrently.
    """
    async with self._lock:
        if user_id in self._user_info_counts:
            self._user_info_counts[user_id] -= 1
            if self._user_info_counts[user_id] <= 0:
                del self._user_info_counts[user_id]
Parameters:
  • user_id - Telegram user/chat ID
Note: Always pair with acquire_info_for_user() or use info_queue() context manager

info_queue() (Context Manager)

Location: misc/queue_manager.py:120 Context manager for automatic queue slot management.
@asynccontextmanager
async def info_queue(
    self, user_id: int, bypass_user_limit: bool = False
) -> AsyncGenerator[bool, None]:
    """
    Context manager for info queue with per-user limiting.

    Args:
        user_id: Telegram user/chat ID
        bypass_user_limit: If True, skip per-user limit check (for inline)

    Yields:
        True if acquired successfully, False if user limit exceeded

    Usage:
        async with queue.info_queue(user_id) as acquired:
            if not acquired:
                await message.reply("Queue full, please wait...")
                return
            # Do work...
    """
    acquired = await self.acquire_info_for_user(user_id, bypass_user_limit)
    try:
        yield acquired
    finally:
        if acquired:
            await self.release_info_for_user(user_id)
Parameters:
  • user_id - Telegram user/chat ID
  • bypass_user_limit - If True, skip limit check
Yields: True if slot acquired, False if limit exceeded Ensures: Automatic release in finally block Example:
from misc.queue_manager import QueueManager

queue = QueueManager.get_instance()

# Normal usage (with limit check)
async with queue.info_queue(message.chat.id) as acquired:
    if not acquired:
        await message.reply(
            locale[lang]["error_queue_full"].format(
                queue.get_user_queue_count(message.chat.id)
            )
        )
        return
    
    # Process download (slot automatically released on exit)
    video_info = await api.video(video_link)
    await send_video_result(message.chat.id, video_info, lang, file_mode)

# Inline usage (bypass limit)
async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
    if not acquired:
        await bot.edit_message_text(
            inline_message_id=message_id,
            text=locale[lang]["error"]
        )
        return
    
    # Process inline download
    video_info = await api.video(video_link)

active_users_count (Property)

Location: misc/queue_manager.py:148 Gets number of users with active requests.
@property
def active_users_count(self) -> int:
    """Number of users currently with items in the info queue."""
    return len(self._user_info_counts)
Returns: Count of users with active requests Example:
queue = QueueManager.get_instance()
print(f"{queue.active_users_count} users currently downloading")

Configuration

Queue settings are configured via data/config.py:
config["queue"] = {
    "max_user_queue_size": 3,  # Max concurrent requests per user (0 = unlimited)
}
Environment Variable: MAX_USER_QUEUE_SIZE

Usage Patterns

Standard Download Handler

from misc.queue_manager import QueueManager
from data.config import config, locale

queue = QueueManager.get_instance()
queue_config = config["queue"]

# Pre-check before acquiring slot
max_queue = queue_config["max_user_queue_size"]
if max_queue > 0:
    user_queue_count = queue.get_user_queue_count(message.chat.id)
    if user_queue_count >= max_queue:
        await message.reply(
            locale[lang]["error_queue_full"].format(user_queue_count)
        )
        return

# Acquire slot and process
async with queue.info_queue(message.chat.id) as acquired:
    if not acquired:
        # Shouldn't happen due to pre-check, but handle anyway
        await message.reply(locale[lang]["error"])
        return
    
    # Process download
    video_info = await api.video(video_link)
    await send_video_result(message.chat.id, video_info, lang, file_mode)

Inline Download Handler

from misc.queue_manager import QueueManager

queue = QueueManager.get_instance()

# Bypass user limit for inline downloads
async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
    if not acquired:
        await bot.edit_message_text(
            inline_message_id=message_id,
            text="Error: Server busy"
        )
        return
    
    # Process download
    video_info = await api.video(video_link)
    file_id = await upload_video_to_storage(video_info.data, video_info)
    await bot.edit_message_media(
        inline_message_id=message_id,
        media=InputMediaVideo(media=file_id)
    )

Why Per-User Limits?

Without Limits:
  • One user could spam requests
  • Server resources exhausted
  • Other users blocked
With Per-User Limits:
  • Fair resource allocation
  • Prevents abuse
  • Better user experience for all
Bypass Option:
  • Inline downloads use bypass_user_limit=True
  • Allows inline queries even when user’s regular queue is full
  • Prevents inline query failures

Thread Safety

All operations use asyncio.Lock for thread safety:
async with self._lock:
    # Atomic operations on shared state
    self._user_info_counts[user_id] = count
This ensures:
  • No race conditions
  • Accurate counts
  • Safe concurrent access

Logging

Queue operations are logged for debugging:
logger.debug(f"User {user_id} acquired info slot (user_count={count})")
logger.debug(f"User {user_id} released info slot (user_count={count})")
logger.debug(f"User {user_id} rejected: {count}/{max} in queue")

Best Practices

  1. Always use context manager - Ensures automatic cleanup
  2. Pre-check before acquiring - Better UX (show error earlier)
  3. Use bypass sparingly - Only for inline downloads
  4. Set reasonable limits - Balance throughput and fairness (default: 3)
  5. Handle acquisition failure - Always check acquired flag
  6. Don’t hold slots unnecessarily - Keep work inside context manager minimal
  7. Use get_instance() - Never create QueueManager directly

Testing

from misc.queue_manager import QueueManager

# Reset singleton for test isolation
QueueManager.reset_instance()

# Create instance with test config
queue = QueueManager.get_instance()

# Test user limits
await queue.acquire_info_for_user(123)
assert queue.get_user_queue_count(123) == 1

await queue.release_info_for_user(123)
assert queue.get_user_queue_count(123) == 0