sqlalchemy 2.0+

SQLAlchemy 2.0+ Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "sqlalchemy 2.0+" with this command: npx skills add slanycukr/riot-api-project/slanycukr-riot-api-project-sqlalchemy-2-0

SQLAlchemy 2.0+ Skill

Quick Start

Basic Setup

from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column import asyncio

Base class for models

class Base(AsyncAttrs, DeclarativeBase): pass

Async engine

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

Session factory

async_session = async_sessionmaker(engine, expire_on_commit=False)

Example model

class User(Base): tablename = "users"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100))

Basic CRUD Operations

async def create_user(name: str, email: str) -> User: async with async_session() as session: async with session.begin(): user = User(name=name, email=email) session.add(user) await session.flush() # Get the ID return user

async def get_user(user_id: int) -> User | None: async with async_session() as session: result = await session.execute(select(User).where(User.id == user_id)) return result.scalar_one_or_none()

async def update_user_email(user_id: int, new_email: str) -> bool: async with async_session() as session: result = await session.execute( update(User).where(User.id == user_id).values(email=new_email) ) await session.commit() return result.rowcount > 0

Common Patterns

Models

Annotated Type-Safe Models (Recommended)

from typing_extensions import Annotated from typing import List, Optional

Reusable column types

intpk = Annotated[int, mapped_column(primary_key=True)] str50 = Annotated[str, mapped_column(String(50))] created_at = Annotated[datetime, mapped_column(insert_default=func.now())]

class Post(Base): tablename = "posts"

id: Mapped[intpk]
title: Mapped[str50]
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created: Mapped[created_at]

# Relationships
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[List["Tag"]] = relationship(secondary="post_tags")

Classic Style Models

class Post(Base): tablename = "posts"

id = mapped_column(Integer, primary_key=True)
title = mapped_column(String(50))
content = mapped_column(Text)
author_id = mapped_column(ForeignKey("users.id"))

author = relationship("User", back_populates="posts")

Relationships

One-to-Many

class User(Base): tablename = "users"

id: Mapped[int] = mapped_column(primary_key=True)
posts: Mapped[List["Post"]] = relationship(
    back_populates="author",
    cascade="all, delete-orphan"
)

class Post(Base): tablename = "posts"

id: Mapped[int] = mapped_column(primary_key=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")

Many-to-Many

association_table = Table( "post_tags", Base.metadata, Column("post_id", ForeignKey("posts.id"), primary_key=True), Column("tag_id", ForeignKey("tags.id"), primary_key=True) )

class Post(Base): tablename = "posts"

id: Mapped[int] = mapped_column(primary_key=True)
tags: Mapped[List["Tag"]] = relationship(
    secondary=association_table,
    back_populates="posts"
)

class Tag(Base): tablename = "tags"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[List["Post"]] = relationship(
    secondary=association_table,
    back_populates="tags"
)

Queries

Basic Select

from sqlalchemy import select, and_, or_

Get all users

async def get_all_users(): async with async_session() as session: result = await session.execute(select(User)) return result.scalars().all()

Filter with conditions

async def get_users_by_name(name: str): async with async_session() as session: stmt = select(User).where(User.name.ilike(f"%{name}%")) result = await session.execute(stmt) return result.scalars().all()

Complex conditions

async def search_users(name: str = None, email: str = None): async with async_session() as session: conditions = [] if name: conditions.append(User.name.ilike(f"%{name}%")) if email: conditions.append(User.email.ilike(f"%{email}%"))

    if conditions:
        stmt = select(User).where(and_(*conditions))
    else:
        stmt = select(User)

    result = await session.execute(stmt)
    return result.scalars().all()

Relationship Loading

from sqlalchemy.orm import selectinload, joinedload

Eager load relationships

async def get_posts_with_author(): async with async_session() as session: stmt = select(Post).options(selectinload(Post.author)) result = await session.execute(stmt) return result.scalars().all()

Joined loading for single relationships

async def get_post_with_tags(post_id: int): async with async_session() as session: stmt = select(Post).options( joinedload(Post.author), selectinload(Post.tags) ).where(Post.id == post_id) result = await session.execute(stmt) return result.scalar_one_or_none()

Pagination

async def get_posts_paginated(page: int, size: int): async with async_session() as session: offset = (page - 1) * size stmt = select(Post).offset(offset).limit(size).order_by(Post.created.desc()) result = await session.execute(stmt) return result.scalars().all()

Aggregations

from sqlalchemy import func

async def get_user_post_count(): async with async_session() as session: stmt = ( select(User.name, func.count(Post.id).label("post_count")) .join(Post) .group_by(User.id, User.name) .order_by(func.count(Post.id).desc()) ) result = await session.execute(stmt) return result.all()

Sessions Management

Context Manager Pattern

async def create_post(title: str, content: str, author_id: int): async with async_session() as session: async with session.begin(): post = Post(title=title, content=content, author_id=author_id) session.add(post) return post

Dependency Injection (FastAPI)

from fastapi import Depends

async def get_db_session(): async with async_session() as session: try: yield session finally: await session.close()

async def create_user_endpoint( user_data: UserCreate, session: AsyncSession = Depends(get_db_session) ): user = User(**user_data.dict()) session.add(user) await session.commit() await session.refresh(user) return user

Scoped Sessions

from sqlalchemy.ext.asyncio import async_scoped_session import asyncio

Create scoped session

async_session_scope = async_scoped_session( async_sessionmaker(engine, expire_on_commit=False), scopefunc=asyncio.current_task )

Use in application

async def some_function(): session = async_session_scope() # Use session normally await session.commit()

Advanced Patterns

Write-Only Relationships (Memory Efficient)

from sqlalchemy.orm import WriteOnlyMapped

class User(Base): tablename = "users"

id: Mapped[int] = mapped_column(primary_key=True)
posts: WriteOnlyMapped["Post"] = relationship()

async def get_user_posts(user_id: int): async with async_session() as session: user = await session.get(User, user_id) if user: # Explicit select for collection stmt = select(Post).where(Post.author_id == user_id) result = await session.execute(stmt) return result.scalars().all() return []

Custom Session Classes

class AsyncSessionWithDefaults(AsyncSession): async def execute_with_defaults(self, statement, **kwargs): # Add default options return await self.execute(statement, **kwargs)

Use custom session

async_session = async_sessionmaker( engine, class_=AsyncSessionWithDefaults, expire_on_commit=False )

Connection Routing

class RoutingSession(Session): def get_bind(self, mapper=None, clause=None, **kw): if mapper and issubclass(mapper.class_, ReadOnlyModel): return read_engine return write_engine

class AsyncRoutingSession(AsyncSession): sync_session_class = RoutingSession

Raw SQL

from sqlalchemy import text

async def run_raw_sql(): async with async_session() as session: result = await session.execute(text("SELECT COUNT(*) FROM users")) count = result.scalar() return count

async def run_parameterized_query(user_id: int): async with async_session() as session: stmt = text("SELECT * FROM posts WHERE author_id = :user_id") result = await session.execute(stmt, {"user_id": user_id}) return result.fetchall()

Performance Tips

  • Use selectinload for collections: More efficient than lazy loading

  • Batch operations: Use add_all() for bulk inserts

  • Connection pooling: Configure pool size based on load

  • Index columns: Add indexes for frequently queried columns

  • Use streaming: For large result sets, use stream()

Streaming large results

async def process_all_users(): async with async_session() as session: result = await session.stream(select(User)) async for user in result.scalars(): # Process user without loading all into memory await process_user(user)

Requirements

pip install sqlalchemy[asyncio] # Core SQLAlchemy pip install asyncpg # PostgreSQL async driver

or

pip install aiosqlite # SQLite async driver

or

pip install aiomysql # MySQL async driver

Database URLs

  • PostgreSQL: postgresql+asyncpg://user:pass@localhost/db

  • SQLite: sqlite+aiosqlite:///database.db

  • MySQL: mysql+aiomysql://user:pass@localhost/db

Migration Integration

Use Alembic for database migrations:

Generate migration

alembic revision --autogenerate -m "Add users table"

Apply migrations

alembic upgrade head

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

axios

No summary provided by upstream source.

Repository SourceNeeds Review
General

radix-ui

No summary provided by upstream source.

Repository SourceNeeds Review
General

httpx

No summary provided by upstream source.

Repository SourceNeeds Review