Skip to content

service

Database Service for Tux Bot.

This module provides a clean, maintainable database service for async PostgreSQL operations.

Key Principles: - Async-first design - Connection pooling with retry logic - Type-safe interfaces - Automatic reconnection handling

Classes:

Classes

DatabaseService

Python
DatabaseService(echo: bool = False)

Async database service for PostgreSQL.

Provides connection management, session handling, query execution with retry logic, and health checks for the PostgreSQL database.

Attributes:

  • _engine (AsyncEngine | None) –

    SQLAlchemy async engine for database connections.

  • _session_factory (async_sessionmaker[AsyncSession] | None) –

    Factory for creating database sessions.

  • _echo (bool) –

    Whether to log SQL queries (useful for debugging).

Initialize the database service.

Parameters:

  • echo (bool, default: False ) –

    Whether to enable SQL query logging (default is False).

Methods:

  • connect

    Connect to the PostgreSQL database.

  • disconnect

    Disconnect from the database and dispose of the connection pool.

  • is_connected

    Check if database is currently connected.

  • test_connection

    Test database connectivity with a simple query.

  • session

    Get a database session context manager.

  • execute_transaction

    Execute a callback inside a database transaction.

  • execute_query

    Execute database operation with automatic retry logic.

  • health_check

    Perform database health check.

  • validate_schema

    Validate that the database schema matches the current model definitions.

Attributes

engine property
Python
engine: AsyncEngine | None

Get the database engine.

Returns:

  • AsyncEngine | None

    The SQLAlchemy async engine, or None if not connected.

Notes

Primarily used for testing and advanced operations.

Functions

connect async
Python
connect(database_url: str, **kwargs: Any) -> None

Connect to the PostgreSQL database.

Parameters:

  • database_url (str) –

    PostgreSQL connection URL in format: postgresql+psycopg://user:password@host:port/database

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to create_async_engine.

disconnect async
Python
disconnect() -> None

Disconnect from the database and dispose of the connection pool.

is_connected
Python
is_connected() -> bool

Check if database is currently connected.

Returns:

  • bool

    True if connected, False otherwise.

test_connection async
Python
test_connection() -> None

Test database connectivity with a simple query.

Raises:

  • Exception

    If the database connection fails or the test query fails.

session async
Python
session() -> AsyncGenerator[AsyncSession]

Get a database session context manager.

Automatically handles connection, commit, and rollback.

Yields:

Examples:

Python Console Session
>>> async with db.session() as session:
...     result = await session.execute(select(User))
...     users = result.scalars().all()
execute_transaction async
Python
execute_transaction(callback: Callable[[], Any]) -> Any

Execute a callback inside a database transaction.

Parameters:

  • callback (Callable[[], Any]) –

    Async function to execute within the transaction.

Returns:

  • Any

    The return value of the callback function.

Notes

If the transaction fails, it will be rolled back automatically.

execute_query async
Python
execute_query(operation: Callable[[AsyncSession], Awaitable[T]], span_desc: str) -> T

Execute database operation with automatic retry logic.

Parameters:

  • operation (Callable[[AsyncSession], Awaitable[T]]) –

    Async function that performs database operations.

  • span_desc (str) –

    Description for Sentry performance monitoring.

Returns:

  • T

    Result of the operation.

Notes

Retries the operation automatically on transient failures.

_execute_with_retry async
Python
_execute_with_retry(
    operation: Callable[[AsyncSession], Awaitable[T]],
    span_desc: str,
    max_retries: int = 3,
    backoff_factor: float = 0.5,
) -> T

Execute database operation with exponential backoff retry logic.

Parameters:

  • operation (Callable[[AsyncSession], Awaitable[T]]) –

    Database operation to execute.

  • span_desc (str) –

    Description for monitoring/logging.

  • max_retries (int, default: 3 ) –

    Maximum number of retry attempts (default is 3).

  • backoff_factor (float, default: 0.5 ) –

    Multiplier for exponential backoff (default is 0.5).

Returns:

  • T

    Result of the operation.

Raises:

  • TimeoutError

    If the operation times out after all retries.

  • DisconnectionError

    If database disconnection occurs after all retries.

  • OperationalError

    If database operational error occurs after all retries.

  • RuntimeError

    If the retry loop completes unexpectedly without return or exception.

health_check async
Python
health_check() -> dict[str, Any]

Perform database health check.

Returns:

  • dict[str, Any]

    Health check result with status and optional error message. Status can be: "healthy", "unhealthy", or "disconnected".

Examples:

Python Console Session
>>> result = await db.health_check()
>>> print(result)
{'status': 'healthy', 'mode': 'async'}
validate_schema async
Python
validate_schema() -> dict[str, Any]

Validate that the database schema matches the current model definitions.

Uses SQLAlchemy's metadata reflection to compare the actual database schema with the defined model metadata. Much more efficient and accurate than manual queries.

Returns:

  • dict[str, Any]

    Schema validation result with status and optional error message. Status can be: "valid", "invalid", or "error".

Examples:

Python Console Session
>>> result = await db.validate_schema()
>>> print(result)
{'status': 'valid', 'mode': 'async'}