Skip to main content
TT-Bot implements a sophisticated queue management system to control concurrent operations and prevent resource exhaustion.

Overview

The QueueManager singleton provides per-user concurrency limiting for video info extraction:
# From misc/queue_manager.py:15
class QueueManager:
    """
    Singleton queue manager for controlling concurrent operations.

    Features:
    - Per-user tracking for info queue (limits concurrent requests per user)
    - Bypass option for inline downloads
    """

Architecture

Singleton pattern

The queue manager uses a singleton to ensure only one instance exists:
# From misc/queue_manager.py:54
@classmethod
def get_instance(cls) -> QueueManager:
    """Get or create the singleton instance."""
    if cls._instance is None:
        queue_config = config["queue"]
        cls._instance = cls(
            max_user_queue=queue_config["max_user_queue_size"],
        )
    return cls._instance

Per-user tracking

The manager tracks how many requests each user has in the queue:
# From misc/queue_manager.py:40
def __init__(self, max_user_queue: int):
    """
    Initialize the queue manager.

    Args:
        max_user_queue: Maximum videos per user in info queue
    """
    self.max_user_queue = max_user_queue
    self._user_info_counts: dict[int, int] = {}
    self._lock = asyncio.Lock()

Configuration

Queue limits are configured in .env:
MAX_USER_QUEUE_SIZE=3
This limits each user to 3 concurrent video info extractions. Set to 0 for unlimited.
# From misc/queue_manager.py:91
max_user_queue_size: int

Usage

Basic usage with context manager

# From handlers/get_video.py:153
queue = QueueManager.get_instance()

# Acquire info queue slot with per-user limit
async with queue.info_queue(message.chat.id) as acquired:
    if not acquired:
        # User limit exceeded
        if status_message:
            await status_message.delete()
        if not group_chat:
            await message.reply(
                locale[lang]["error_queue_full"].format(
                    queue.get_user_queue_count(message.chat.id)
                ),
                reply_markup=try_again_button(lang),
            )
        return

    # Fetch video info
    video_info = await api.video(video_link)

Pre-check before acquiring

The handler checks queue limits before starting processing:
# From handlers/get_video.py:131
# Check per-user queue limit before proceeding (0 = no limit)
max_queue = queue_config["max_user_queue_size"]
if max_queue > 0:
    user_queue_count = queue.get_user_queue_count(message.chat.id)
    if user_queue_count >= max_queue:
        if not group_chat:
            await message.reply(
                locale[lang]["error_queue_full"].format(user_queue_count),
                reply_markup=try_again_button(lang),
            )
        return
This provides instant feedback before showing processing status.

Bypass for inline mode

Inline downloads can bypass per-user limits:
# From misc/queue_manager.py:121
@asynccontextmanager
async def info_queue(
    self, user_id: int, bypass_user_limit: bool = False
) -> AsyncGenerator[bool, None]:
    """
    Context manager for info queue with per-user limiting.

    Args:
        user_id: Telegram user/chat ID
        bypass_user_limit: If True, skip per-user limit check (for inline)
    """
Usage:
async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
    # Always acquires for inline mode
    video_info = await api.video_with_retry(...)

Slot acquisition

Acquire logic

# From misc/queue_manager.py:72
async def acquire_info_for_user(
    self, user_id: int, bypass_user_limit: bool = False
) -> bool:
    """
    Acquire info slot for a user.

    Returns:
        True if acquired successfully, False if user limit exceeded
    """
    async with self._lock:
        if not bypass_user_limit and self.max_user_queue > 0:
            current_count = self._user_info_counts.get(user_id, 0)
            if current_count >= self.max_user_queue:
                logger.debug(
                    f"User {user_id} rejected: {current_count}/{self.max_user_queue} in queue"
                )
                return False

        # Increment user count
        self._user_info_counts[user_id] = self._user_info_counts.get(user_id, 0) + 1

Release logic

# From misc/queue_manager.py:103
async def release_info_for_user(self, user_id: int) -> None:
    """Release info slot for a user.

    This method is async to properly acquire the lock and prevent
    race conditions when multiple coroutines release concurrently.
    """
    async with self._lock:
        if user_id in self._user_info_counts:
            self._user_info_counts[user_id] -= 1
            if self._user_info_counts[user_id] <= 0:
                del self._user_info_counts[user_id]
The release is async to prevent race conditions during concurrent releases.

Queue monitoring

Get user queue count

# From misc/queue_manager.py:68
def get_user_queue_count(self, user_id: int) -> int:
    """Get current queue count for a user."""
    return self._user_info_counts.get(user_id, 0)

Get active users count

# From misc/queue_manager.py:148
@property
def active_users_count(self) -> int:
    """Number of users currently with items in the info queue."""
    return len(self._user_info_counts)

Error messages

When queue is full, users see a localized message with retry button:
# From handlers/get_video.py:32
def try_again_button(lang: str):
    """Create a 'Try Again' button for queue full error."""
    keyb = InlineKeyboardBuilder()
    keyb.button(
        text=locale[lang]["try_again_button"],
        callback_data=RETRY_CALLBACK_PREFIX,
    )
    return keyb.as_markup()
The retry button re-processes the original message:
# From handlers/get_video.py:322
@video_router.callback_query(F.data == RETRY_CALLBACK_PREFIX)
async def handle_retry_callback(callback: CallbackQuery):
    """Handle 'Try Again' button click for queue full error."""
    # Get the original message that contains the TikTok link
    original_message = callback.message.reply_to_message

    if not original_message or not original_message.text:
        await callback.answer("Original message not found", show_alert=True)
        return

    # Delete the error message with the button
    try:
        if hasattr(callback.message, "delete"):
            await callback.message.delete()
    except TelegramBadRequest:
        logging.debug("Retry button message already deleted")

    # Re-process the original message
    await send_tiktok_video(original_message)

Thread safety

All queue operations are protected by an async lock:
# From misc/queue_manager.py:49
self._lock = asyncio.Lock()
This ensures:
  • No race conditions during concurrent acquires/releases
  • Accurate count tracking
  • Safe dictionary modifications

Queue vs. no global queue

Important: The bot only limits concurrent info extractions per user. There is no global send queue:
# From handlers/get_video.py:199
# Send video/images (no global send queue - per-user limit only)
if video_info.is_slideshow:  # Process images
This design:
  • Prevents one user from blocking others during slow extractions
  • Allows unlimited parallel sends (limited by Telegram rate limits)
  • Balances resource usage with responsiveness

Testing support

The singleton can be reset for testing:
# From misc/queue_manager.py:64
@classmethod
def reset_instance(cls) -> None:
    """Reset the singleton instance (useful for testing)."""
    cls._instance = None

Logging

Queue operations are logged at DEBUG level:
logger.debug(
    f"User {user_id} acquired info slot "
    f"(user_count={self._user_info_counts.get(user_id, 0)})"
)

logger.debug(
    f"User {user_id} released info slot "
    f"(user_count={self._user_info_counts.get(user_id, 0)})"
)
Enable DEBUG logging to monitor queue behavior:
LOG_LEVEL=DEBUG