Skip to main content

Overview

The database module provides SQLAlchemy-based data persistence with async support. It uses a repository pattern through db_service.py to abstract database operations. Stack:
  • SQLAlchemy 2.0 (async)
  • PostgreSQL (via asyncpg)
  • Declarative models

Architecture

Module Structure

data/
├── database.py        # Engine, session, initialization
├── db_service.py      # Repository functions
└── models/
    ├── __init__.py    # Model exports
    ├── users.py       # Users model
    ├── video.py       # Video model
    └── music.py       # Music model

Initialization Flow

from data.database import initialize_database_components, init_db

# 1. Initialize engine and session factory (once at startup)
initialize_database_components(db_url)

# 2. Create tables
await init_db()

# 3. Use db_service functions for all operations
from data.db_service import get_user, create_user
user = await get_user(user_id)

database.py

Location: data/database.py Core database engine and session management.

Global Variables

engine = None              # SQLAlchemy async engine
async_session = None       # Session factory
Base = declarative_base()  # Model base class

initialize_database_components()

Location: data/database.py:17 CRITICAL: Must be called before any database operations.
def initialize_database_components(db_url: str):
    """
    Initialize database engine and session factory.
    
    Call this function once during application startup before any database operations.
    Must be called before using init_db(), get_db(), or get_session().
    
    Args:
        db_url: Database connection URL string
        
    Raises:
        RuntimeError: If called multiple times (will overwrite existing globals)
    """
    global engine, async_session

    engine_url = get_async_db_url(db_url)
    engine = create_async_engine(engine_url, echo=False)
    async_session = sessionmaker(
        engine, class_=AsyncSession, expire_on_commit=False
    )
Example:
from data.database import initialize_database_components
from data.config import config

# In main.py or startup
db_url = config["database"]["url"]
initialize_database_components(db_url)

init_db()

Location: data/database.py:51 Creates all database tables.
async def init_db():
    # Ensure engine is initialized before calling this
    if engine is None:
        raise RuntimeError(
            "Database engine not initialized. Call initialize_database_components first."
        )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
Raises: RuntimeError if initialize_database_components() not called

get_session()

Location: data/database.py:71 Creates a database session (used internally by db_service).
async def get_session() -> AsyncSession:
    # Ensure async_session is initialized
    if async_session is None:
        raise RuntimeError(
            "Database session factory not initialized. Call initialize_database_components first."
        )
    async with async_session() as session:
        return session
Returns: AsyncSession instance

Models

Location: data/models/

Users Model

Location: data/models/users.py Stores user/chat information and settings.
from sqlalchemy import Column, String, BigInteger, Boolean, Integer
from data.database import Base

class Users(Base):
    __tablename__ = "users"

    user_id = Column(BigInteger, primary_key=True, unique=True, nullable=False)
    registered_at = Column(BigInteger, nullable=True)  # Unix timestamp
    lang = Column(String, default='en', nullable=False)
    link = Column(String, nullable=True)  # Referral link
    file_mode = Column(Boolean, default=False, nullable=False)
    ad_count = Column(Integer, default=0, nullable=False)
    ad_cooldown = Column(BigInteger, default=0, nullable=False)  # Unix timestamp
Fields:
  • user_id - Telegram user/chat ID (primary key)
  • registered_at - Registration timestamp
  • lang - Language code (en, ru, es, etc.)
  • link - Referral link parameter
  • file_mode - Send as file (True) or media (False)
  • ad_count - Number of downloads since last ad
  • ad_cooldown - Timestamp when ad cooldown expires

Video Model

Location: data/models/video.py Records video/slideshow downloads for analytics.
from sqlalchemy import Column, String, BigInteger, Boolean, ForeignKey
from data.database import Base

class Video(Base):
    __tablename__ = "videos"

    pk_id = Column(BigInteger, primary_key=True, autoincrement=True, nullable=False)
    user_id = Column(BigInteger, ForeignKey("users.user_id"), nullable=False)
    downloaded_at = Column(BigInteger, nullable=True)  # Unix timestamp
    video_link = Column(String, nullable=False)
    is_images = Column(Boolean, default=False, nullable=False)  # Slideshow?
    is_processed = Column(Boolean, default=False, nullable=False)  # Image converted?
    is_inline = Column(Boolean, default=False, nullable=False)  # Inline query?
Fields:
  • pk_id - Auto-incrementing primary key
  • user_id - Foreign key to Users.user_id
  • downloaded_at - Download timestamp
  • video_link - TikTok/Instagram URL
  • is_images - True for slideshows, False for videos
  • is_processed - True if images were converted/optimized
  • is_inline - True if downloaded via inline query

Music Model

Location: data/models/music.py Records music/audio extractions.
from sqlalchemy import Column, BigInteger, ForeignKey
from data.database import Base

class Music(Base):
    __tablename__ = "music"

    pk_id = Column(BigInteger, primary_key=True, autoincrement=True, nullable=False)
    user_id = Column(BigInteger, ForeignKey("users.user_id"), nullable=False)
    downloaded_at = Column(BigInteger, nullable=True)  # Unix timestamp
    video_id = Column(BigInteger, nullable=False)  # TikTok video ID
Fields:
  • pk_id - Auto-incrementing primary key
  • user_id - Foreign key to Users.user_id
  • downloaded_at - Download timestamp
  • video_id - TikTok video ID

db_service.py

Location: data/db_service.py Repository pattern implementation - all database operations go through these functions.

User Management

get_user()

Location: data/db_service.py:11 Retrieves user by ID.
async def get_user(user_id: int) -> Optional[Users]:
    async with await get_session() as db:
        stmt = select(Users).where(Users.user_id == user_id)
        result = await db.execute(stmt)
        return result.scalar_one_or_none()
Returns: Users object or None

create_user()

Location: data/db_service.py:18 Creates new user record.
async def create_user(user_id: int, lang: str, link: Optional[str] = None) -> Users:
    async with await get_session() as db:
        time_now = int(datetime.now().timestamp())
        user = Users(
            user_id=user_id,
            registered_at=time_now,
            lang=lang,
            link=link,
            ad_count=0,
            ad_cooldown=0,
        )
        db.add(user)
        await db.commit()
        return user
Parameters:
  • user_id - Telegram user/chat ID
  • lang - Language code
  • link - Optional referral link parameter
Returns: Created Users object

update_user_mode()

Location: data/db_service.py:34 Toggles file mode setting.
async def update_user_mode(user_id: int, file_mode: bool) -> None:
    async with await get_session() as db:
        stmt = update(Users).where(Users.user_id == user_id).values(file_mode=file_mode)
        await db.execute(stmt)
        await db.commit()

update_user_lang()

Location: data/db_service.py:159 Updates user language.
async def update_user_lang(user_id: int, lang: str) -> None:
    async with await get_session() as db:
        stmt = update(Users).where(Users.user_id == user_id).values(lang=lang)
        await db.execute(stmt)
        await db.commit()

get_user_settings()

Location: data/db_service.py:133 Retrieves user language and file mode.
async def get_user_settings(user_id: int) -> Optional[Tuple[str, bool]]:
    async with await get_session() as db:
        stmt = select(Users.lang, Users.file_mode).where(Users.user_id == user_id)
        result = await db.execute(stmt)
        user = result.first()
        if user:
            return user[0], bool(user[1])  # (lang, file_mode)
        return None
Returns: (lang, file_mode) tuple or None Example:
from data.db_service import get_user_settings

settings = await get_user_settings(message.chat.id)
if settings:
    lang, file_mode = settings
else:
    # New user - register them
    lang = await lang_func(chat_id, message.from_user.language_code)
    file_mode = False

Video/Download Tracking

add_video()

Location: data/db_service.py:143 Records video/slideshow download.
async def add_video(
    user_id: int, 
    video_link: str, 
    is_images: bool, 
    is_processed: bool = False, 
    is_inline: bool = False
) -> None:
    async with await get_session() as db:
        video = Video(
            user_id=user_id, 
            downloaded_at=int(datetime.now().timestamp()), 
            video_link=video_link,
            is_images=is_images, 
            is_processed=is_processed, 
            is_inline=is_inline
        )
        db.add(video)
        await db.commit()
Parameters:
  • user_id - Telegram user/chat ID
  • video_link - TikTok/Instagram URL
  • is_images - True for slideshows
  • is_processed - True if images were converted
  • is_inline - True if inline query

add_music()

Location: data/db_service.py:152 Records music extraction.
async def add_music(user_id: int, video_id: int) -> None:
    async with await get_session() as db:
        music = Music(
            user_id=user_id, 
            downloaded_at=int(datetime.now().timestamp()), 
            video_id=video_id
        )
        db.add(music)
        await db.commit()

Statistics

get_user_stats()

Location: data/db_service.py:41 Retrieves user statistics.
async def get_user_stats(user_id: int) -> Tuple[Optional[Users], int, int]:
    async with await get_session() as db:
        # Get user
        stmt = select(Users).where(Users.user_id == user_id)
        result = await db.execute(stmt)
        user = result.scalar_one_or_none()
        if not user:
            return None, 0, 0

        # Get video count
        stmt = select(func.count(Video.video_link)).where(Video.user_id == user_id)
        result = await db.execute(stmt)
        videos_count = result.scalar()

        # Get images count
        stmt = select(func.count(Video.video_link)).where(
            Video.user_id == user_id, Video.is_images == True
        )
        result = await db.execute(stmt)
        images_count = result.scalar()

        return user, videos_count, images_count
Returns: (user, videos_count, images_count) tuple

get_stats_by_period()

Location: data/db_service.py:112 Retrieves downloads within time period.
async def get_stats_by_period(
    period: int = 0, 
    chat_type: str = 'all'
) -> List[Tuple[int, str]]:
    # Returns [(timestamp, video_link), ...]
    # period: seconds (0 = all time)
    # chat_type: 'all', 'users' (>0), 'groups' (<0)
Parameters:
  • period - Time period in seconds (0 for all time)
  • chat_type - Filter by chat type: ‘all’, ‘users’, ‘groups’
Returns: List of (downloaded_at, video_link) tuples

get_referral_stats()

Location: data/db_service.py:70 Top 10 referral links.
async def get_referral_stats() -> List[Tuple[str, int]]:
    # Returns [(link, count), ...]
Returns: List of (link, count) tuples (top 10)

get_other_stats()

Location: data/db_service.py:83 Miscellaneous statistics.
async def get_other_stats() -> Tuple[int, List[Tuple[str, int]], List[Tuple[int, int]]]:
    # Returns (file_mode_count, top_langs, top_users)
Returns: (file_mode_count, top_langs, top_users) tuple
  • file_mode_count - Users with file mode enabled
  • top_langs - [(lang, count), ...]
  • top_users - [(user_id, download_count), ...] (top 10)

should_show_ad()

Location: data/db_service.py:197 Checks if ad should be shown to user.
async def should_show_ad(user_id: int) -> bool:
    async with await get_session() as db:
        stmt = select(Users.ad_count, Users.ad_cooldown).where(Users.user_id == user_id)
        result = await db.execute(stmt)
        ad_count, ad_cooldown = result.first()
        # Show ad if 3+ downloads OR cooldown expired
        return ad_count >= 3 or ad_cooldown < int(datetime.now().timestamp())
Returns: True if ad should be shown

record_ad_show()

Location: data/db_service.py:175 Records ad display and resets counter.
async def record_ad_show(user_id: int) -> None:
    async with await get_session() as db:
        ad_cooldown = int(datetime.now().timestamp()) + 86400  # 24 hours
        await db.execute(
            update(Users)
            .where(Users.user_id == user_id)
            .values(ad_count=1, ad_cooldown=ad_cooldown)
        )
        await db.commit()
Effect: Resets ad_count to 1, sets 24-hour cooldown

increase_ad_count()

Location: data/db_service.py:186 Increments ad counter.
async def increase_ad_count(user_id: int) -> None:
    async with await get_session() as db:
        stmt = select(Users.ad_count).where(Users.user_id == user_id)
        result = await db.execute(stmt)
        ad_count = result.scalar() or 0
        await db.execute(
            update(Users).where(Users.user_id == user_id).values(ad_count=ad_count + 1)
        )
        await db.commit()

Admin Functions

get_user_ids()

Location: data/db_service.py:166 Retrieves all user IDs (for broadcasts).
async def get_user_ids(only_positive: bool = True) -> List[int]:
    async with await get_session() as db:
        stmt = select(Users.user_id)
        if only_positive:
            stmt = stmt.where(Users.user_id > 0)  # Only users, not groups
        result = await db.execute(stmt)
        return [id[0] for id in result.all()]
Parameters:
  • only_positive - If True, exclude groups (negative IDs)
Returns: List of user IDs

get_user_videos()

Location: data/db_service.py:63 Retrieves all videos for a user.
async def get_user_videos(user_id: int) -> List[Tuple[int, str]]:
    async with await get_session() as db:
        stmt = select(Video.downloaded_at, Video.video_link).where(Video.user_id == user_id)
        result = await db.execute(stmt)
        return result.all()
Returns: List of (downloaded_at, video_link) tuples

Usage Examples

User Registration Flow

from data.db_service import get_user_settings, create_user
from misc.utils import lang_func, start_manager

settings = await get_user_settings(message.chat.id)
if not settings:
    # New user - register
    lang = await lang_func(message.chat.id, message.from_user.language_code, True)
    file_mode = False
    await start_manager(message.chat.id, message, lang)
else:
    # Existing user
    lang, file_mode = settings

Download Tracking

from data.db_service import add_video

# After successful download
try:
    await add_video(
        user_id=message.chat.id,
        video_link=video_link,
        is_images=video_info.is_slideshow,
        is_processed=was_processed,
        is_inline=False,
    )
    logger.info(f"Video Download: CHAT {message.chat.id} - VIDEO {video_link}")
except Exception as e:
    logger.error(f"Can't write into database: {e}")
from data.db_service import should_show_ad, record_ad_show, increase_ad_count

if not group_chat:
    if await should_show_ad(message.chat.id):
        # Show ad
        await record_ad_show(message.chat.id)
        await message.answer(
            locale[lang]["ad_support"],
            reply_markup=ad_button,
        )
    else:
        # Increment counter
        await increase_ad_count(message.chat.id)

Best Practices

  1. Always call initialize_database_components() before any DB operations
  2. Use db_service functions - never query models directly
  3. Handle None returns - users may not exist
  4. Use transactions - db_service functions auto-commit
  5. Log database errors - don’t let DB issues crash handlers
  6. Use Unix timestamps - consistent timezone handling
  7. Validate user_id - check for existence before operations
  8. Clean up sessions - async with handles this automatically