sqlalchemy-orm

progressive_disclosure: entry_point: summary: "Python SQL toolkit and ORM with powerful query builder and relationship mapping" when_to_use: - "When building Python applications with databases" - "When needing complex SQL queries with type safety" - "When working with FastAPI/Flask/Django" - "When needing database migrations (Alembic)" quick_start: - "pip install sqlalchemy" - "Define models with declarative base" - "Create engine and session" - "Query with select() and commit()" token_estimate: entry: 70-85 full: 4500-5500

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "sqlalchemy-orm" with this command: npx skills add bobmatnyc/claude-mpm-skills/bobmatnyc-claude-mpm-skills-sqlalchemy-orm

SQLAlchemy ORM Skill

progressive_disclosure: entry_point: summary: "Python SQL toolkit and ORM with powerful query builder and relationship mapping" when_to_use:

  • "When building Python applications with databases"
  • "When needing complex SQL queries with type safety"
  • "When working with FastAPI/Flask/Django"
  • "When needing database migrations (Alembic)" quick_start:
  • "pip install sqlalchemy"
  • "Define models with declarative base"
  • "Create engine and session"
  • "Query with select() and commit()" token_estimate: entry: 70-85 full: 4500-5500

Core Concepts

SQLAlchemy 2.0 Modern API

SQLAlchemy 2.0 introduced modern patterns with better type hints, improved query syntax, and async support.

Key Changes from 1.x:

  • select() instead of Query

  • Mapped[T] and mapped_column() for type hints

  • Explicit Session.execute() for queries

  • Better async support with AsyncSession

Installation

Core SQLAlchemy

pip install sqlalchemy

With async support

pip install sqlalchemy[asyncio] aiosqlite # SQLite pip install sqlalchemy[asyncio] asyncpg # PostgreSQL

With Alembic for migrations

pip install alembic

FastAPI integration

pip install fastapi sqlalchemy

Declarative Models (SQLAlchemy 2.0)

Basic Model Definition

from datetime import datetime from typing import Optional from sqlalchemy import String, DateTime, ForeignKey, func from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

Base class for all models

class Base(DeclarativeBase): pass

User model with type hints

class User(Base): tablename = "users"

# Primary key
id: Mapped[int] = mapped_column(primary_key=True)

# Required fields
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
hashed_password: Mapped[str] = mapped_column(String(255))

# Optional fields
full_name: Mapped[Optional[str]] = mapped_column(String(100))
is_active: Mapped[bool] = mapped_column(default=True)

# Timestamps with server defaults
created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    server_default=func.now(),
    onupdate=func.now()
)

# Relationships
posts: Mapped[list["Post"]] = relationship(back_populates="author")

def __repr__(self) -> str:
    return f"User(id={self.id}, email={self.email})"

Relationships

One-to-Many:

class Post(Base): tablename = "posts"

id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

# Relationship with back_populates
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(
    secondary="post_tags",
    back_populates="posts"
)

Many-to-Many:

from sqlalchemy import Table, Column, Integer, ForeignKey

Association table

post_tags = Table( "post_tags", Base.metadata, Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True), Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True) )

class Tag(Base): tablename = "tags"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)

posts: Mapped[list["Post"]] = relationship(
    secondary=post_tags,
    back_populates="tags"
)

Database Setup

Engine and Session Configuration

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.pool import QueuePool

Database URL formats

SQLite: sqlite:///./database.db

PostgreSQL: postgresql://user:pass@localhost/dbname

MySQL: mysql+pymysql://user:pass@localhost/dbname

DATABASE_URL = "postgresql://user:pass@localhost/mydb"

Create engine with connection pooling

engine = create_engine( DATABASE_URL, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_pre_ping=True, # Check connection before using echo=False # Set True for SQL logging )

Session factory

SessionLocal = sessionmaker( bind=engine, autocommit=False, autoflush=False, expire_on_commit=False )

Create tables

Base.metadata.create_all(bind=engine)

Dependency Injection (FastAPI Pattern)

from typing import Generator

def get_db() -> Generator[Session, None, None]: """Database session dependency for FastAPI.""" db = SessionLocal() try: yield db finally: db.close()

Usage in FastAPI

from fastapi import Depends

@app.get("/users/{user_id}") def get_user(user_id: int, db: Session = Depends(get_db)): return db.execute( select(User).where(User.id == user_id) ).scalar_one_or_none()

Query Patterns (SQLAlchemy 2.0)

Select Queries

from sqlalchemy import select, and_, or_, desc, func

Basic select

stmt = select(User).where(User.email == "user@example.com") user = session.execute(stmt).scalar_one_or_none()

Multiple conditions

stmt = select(User).where( and_( User.is_active == True, User.created_at > datetime(2024, 1, 1) ) ) users = session.execute(stmt).scalars().all()

OR conditions

stmt = select(User).where( or_( User.email.like("%@gmail.com"), User.email.like("%@yahoo.com") ) )

Ordering and limiting

stmt = ( select(User) .where(User.is_active == True) .order_by(desc(User.created_at)) .limit(10) .offset(20) )

Counting

stmt = select(func.count()).select_from(User) count = session.execute(stmt).scalar()

Joins

Inner join

stmt = ( select(Post, User) .join(User, Post.user_id == User.id) .where(User.is_active == True) ) results = session.execute(stmt).all()

Left outer join

stmt = ( select(User, func.count(Post.id).label("post_count")) .outerjoin(Post) .group_by(User.id) )

Multiple joins

stmt = ( select(Post) .join(Post.author) .join(Post.tags) .where(Tag.name == "python") )

Eager Loading (Solve N+1 Problem)

from sqlalchemy.orm import selectinload, joinedload

selectinload - separate query (better for collections)

stmt = select(User).options(selectinload(User.posts)) users = session.execute(stmt).scalars().all()

Now users[0].posts won't trigger additional queries

joinedload - single query with join (better for one-to-one)

stmt = select(Post).options(joinedload(Post.author)) posts = session.execute(stmt).unique().scalars().all()

Nested eager loading

stmt = select(User).options( selectinload(User.posts).selectinload(Post.tags) )

Load only specific columns

from sqlalchemy.orm import load_only stmt = select(User).options(load_only(User.id, User.email))

CRUD Operations

Create

def create_user(db: Session, email: str, username: str, password: str): """Create new user.""" user = User( email=email, username=username, hashed_password=hash_password(password) ) db.add(user) db.commit() db.refresh(user) # Get updated fields (id, timestamps) return user

Bulk insert

users = [ User(email=f"user{i}@example.com", username=f"user{i}") for i in range(100) ] db.add_all(users) db.commit()

Read

def get_user_by_email(db: Session, email: str) -> Optional[User]: """Get user by email.""" stmt = select(User).where(User.email == email) return db.execute(stmt).scalar_one_or_none()

def get_users( db: Session, skip: int = 0, limit: int = 100 ) -> list[User]: """Get paginated users.""" stmt = ( select(User) .where(User.is_active == True) .order_by(User.created_at.desc()) .offset(skip) .limit(limit) ) return db.execute(stmt).scalars().all()

Update

def update_user(db: Session, user_id: int, **kwargs): """Update user fields.""" stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none()

if not user:
    return None

for key, value in kwargs.items():
    setattr(user, key, value)

db.commit()
db.refresh(user)
return user

Bulk update

from sqlalchemy import update

stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=datetime.utcnow()) ) db.execute(stmt) db.commit()

Delete

def delete_user(db: Session, user_id: int) -> bool: """Delete user.""" stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none()

if not user:
    return False

db.delete(user)
db.commit()
return True

Bulk delete

from sqlalchemy import delete

stmt = delete(User).where(User.is_active == False) db.execute(stmt) db.commit()

Transactions and Session Management

Context Manager Pattern

from contextlib import contextmanager

@contextmanager def get_db_session(): """Session context manager.""" session = SessionLocal() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close()

Usage

with get_db_session() as db: user = create_user(db, "test@example.com", "testuser", "password") # Auto-commits on success, rollback on exception

Manual Transaction Control

def transfer_money(db: Session, from_user_id: int, to_user_id: int, amount: float): """Transfer money between users with transaction.""" try: # Begin nested transaction with db.begin_nested(): # Deduct from sender stmt = select(User).where(User.id == from_user_id).with_for_update() sender = db.execute(stmt).scalar_one() sender.balance -= amount

        # Add to receiver
        stmt = select(User).where(User.id == to_user_id).with_for_update()
        receiver = db.execute(stmt).scalar_one()
        receiver.balance += amount

    db.commit()
except Exception as e:
    db.rollback()
    raise

Async SQLAlchemy

Async Setup

from sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker )

Async engine (note: asyncpg for PostgreSQL, aiosqlite for SQLite)

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"

async_engine = create_async_engine( DATABASE_URL, echo=False, pool_size=5, max_overflow=10 )

Async session factory

AsyncSessionLocal = async_sessionmaker( async_engine, class_=AsyncSession, expire_on_commit=False )

Create tables

async def init_db(): async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)

Async CRUD Operations

async def get_user_async(user_id: int) -> Optional[User]: """Get user asynchronously.""" async with AsyncSessionLocal() as session: stmt = select(User).where(User.id == user_id) result = await session.execute(stmt) return result.scalar_one_or_none()

async def create_user_async(email: str, username: str) -> User: """Create user asynchronously.""" async with AsyncSessionLocal() as session: user = User(email=email, username=username) session.add(user) await session.commit() await session.refresh(user) return user

FastAPI async dependency

async def get_async_db(): async with AsyncSessionLocal() as session: yield session

@app.get("/users/{user_id}") async def get_user_endpoint( user_id: int, db: AsyncSession = Depends(get_async_db) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) return result.scalar_one_or_none()

Alembic Migrations

Setup Alembic

Initialize Alembic

alembic init alembic

Edit alembic.ini - set database URL

sqlalchemy.url = postgresql://user:pass@localhost/mydb

Or use env variable in alembic/env.py

Configure Alembic

alembic/env.py

from sqlalchemy import engine_from_config, pool from alembic import context from myapp.models import Base # Import your Base

Add your model's MetaData

target_metadata = Base.metadata

def run_migrations_online(): """Run migrations in 'online' mode.""" configuration = config.get_section(config.config_ini_section) configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")

connectable = engine_from_config(
    configuration,
    prefix="sqlalchemy.",
    poolclass=pool.NullPool,
)

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata
    )

    with context.begin_transaction():
        context.run_migrations()

Create and Apply Migrations

Auto-generate migration from model changes

alembic revision --autogenerate -m "Add users table"

Review generated migration in alembic/versions/

Apply migration

alembic upgrade head

Rollback one version

alembic downgrade -1

Show current version

alembic current

Show migration history

alembic history

Manual Migration Example

alembic/versions/xxx_add_users.py

from alembic import op import sqlalchemy as sa

def upgrade(): op.create_table( 'users', sa.Column('id', sa.Integer(), nullable=False), sa.Column('email', sa.String(255), nullable=False), sa.Column('username', sa.String(50), nullable=False), sa.PrimaryKeyConstraint('id') ) op.create_index('ix_users_email', 'users', ['email'], unique=True)

def downgrade(): op.drop_index('ix_users_email', table_name='users') op.drop_table('users')

FastAPI Integration

Complete FastAPI Example

from fastapi import FastAPI, Depends, HTTPException, status from sqlalchemy.orm import Session from pydantic import BaseModel, EmailStr from typing import List

app = FastAPI()

Pydantic schemas

class UserBase(BaseModel): email: EmailStr username: str

class UserCreate(UserBase): password: str

class UserResponse(UserBase): id: int is_active: bool created_at: datetime

class Config:
    from_attributes = True  # SQLAlchemy 2.0 (was orm_mode)

CRUD operations

@app.post("/users/", response_model=UserResponse, status_code=status.HTTP_201_CREATED) def create_user_endpoint(user: UserCreate, db: Session = Depends(get_db)): # Check if user exists stmt = select(User).where(User.email == user.email) if db.execute(stmt).scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" )

# Create user
db_user = User(
    email=user.email,
    username=user.username,
    hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user

@app.get("/users/{user_id}", response_model=UserResponse) def read_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) user = db.execute(stmt).scalar_one_or_none()

if not user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="User not found"
    )
return user

@app.get("/users/", response_model=List[UserResponse]) def list_users( skip: int = 0, limit: int = 100, db: Session = Depends(get_db) ): stmt = ( select(User) .where(User.is_active == True) .offset(skip) .limit(limit) ) return db.execute(stmt).scalars().all()

@app.put("/users/{user_id}", response_model=UserResponse) def update_user( user_id: int, user_update: UserBase, db: Session = Depends(get_db) ): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none()

if not db_user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="User not found"
    )

db_user.email = user_update.email
db_user.username = user_update.username
db.commit()
db.refresh(db_user)
return db_user

@app.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_user(user_id: int, db: Session = Depends(get_db)): stmt = select(User).where(User.id == user_id) db_user = db.execute(stmt).scalar_one_or_none()

if not db_user:
    raise HTTPException(
        status_code=status.HTTP_404_NOT_FOUND,
        detail="User not found"
    )

db.delete(db_user)
db.commit()

Testing with Pytest

Test Database Setup

import pytest from sqlalchemy import create_engine, StaticPool from sqlalchemy.orm import sessionmaker

In-memory SQLite for testing

SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"

@pytest.fixture(scope="function") def db_session(): """Create test database session.""" engine = create_engine( SQLALCHEMY_TEST_DATABASE_URL, connect_args={"check_same_thread": False}, poolclass=StaticPool, )

# Create tables
Base.metadata.create_all(bind=engine)

TestingSessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=engine
)

session = TestingSessionLocal()
try:
    yield session
finally:
    session.close()
    Base.metadata.drop_all(bind=engine)

@pytest.fixture(scope="function") def test_user(db_session): """Create test user.""" user = User( email="test@example.com", username="testuser", hashed_password="hashed" ) db_session.add(user) db_session.commit() db_session.refresh(user) return user

Test Examples

def test_create_user(db_session): """Test user creation.""" user = User(email="new@example.com", username="newuser") db_session.add(user) db_session.commit()

assert user.id is not None
assert user.email == "new@example.com"
assert user.created_at is not None

def test_query_user(db_session, test_user): """Test user query.""" stmt = select(User).where(User.email == "test@example.com") found_user = db_session.execute(stmt).scalar_one()

assert found_user.id == test_user.id
assert found_user.username == test_user.username

def test_update_user(db_session, test_user): """Test user update.""" test_user.username = "updated" db_session.commit()

stmt = select(User).where(User.id == test_user.id)
updated_user = db_session.execute(stmt).scalar_one()
assert updated_user.username == "updated"

def test_delete_user(db_session, test_user): """Test user deletion.""" user_id = test_user.id db_session.delete(test_user) db_session.commit()

stmt = select(User).where(User.id == user_id)
assert db_session.execute(stmt).scalar_one_or_none() is None

Performance Optimization

Query Optimization

Use indexes

class User(Base): tablename = "users"

email: Mapped[str] = mapped_column(String(255), index=True, unique=True)
created_at: Mapped[datetime] = mapped_column(index=True)

# Composite index
__table_args__ = (
    Index('ix_user_email_active', 'email', 'is_active'),
)

Use select_from for complex queries

stmt = ( select(func.count(Post.id)) .select_from(User) .join(Post) .where(User.is_active == True) )

Use contains_eager for joined loads

from sqlalchemy.orm import contains_eager

stmt = ( select(Post) .join(Post.author) .options(contains_eager(Post.author)) .where(User.is_active == True) )

Connection Pooling

Configure pool

engine = create_engine( DATABASE_URL, pool_size=20, # Number of connections to keep max_overflow=10, # Additional connections when pool full pool_timeout=30, # Seconds to wait for connection pool_recycle=3600, # Recycle connections after 1 hour pool_pre_ping=True # Verify connections before use )

Monitor pool

from sqlalchemy import event

@event.listens_for(engine, "connect") def receive_connect(dbapi_conn, connection_record): print("New connection established")

@event.listens_for(engine, "checkout") def receive_checkout(dbapi_conn, connection_record, connection_proxy): print("Connection checked out from pool")

Batch Operations

Bulk insert with executemany

from sqlalchemy import insert

data = [ {"email": f"user{i}@example.com", "username": f"user{i}"} for i in range(1000) ]

stmt = insert(User) db.execute(stmt, data) db.commit()

Bulk update

from sqlalchemy import update

stmt = ( update(User) .where(User.is_active == False) .values(deleted_at=func.now()) ) db.execute(stmt)

Best Practices

  • Use SQLAlchemy 2.0 Syntax: Modern API with better type hints

  • Type Annotations: Use Mapped[T] and mapped_column()

  • Eager Loading: Solve N+1 queries with selectinload /joinedload

  • Session Management: Use dependency injection pattern

  • Migrations: Always use Alembic for schema changes

  • Indexes: Add indexes for frequently queried columns

  • Connection Pooling: Configure appropriate pool settings

  • Testing: Use in-memory SQLite for fast tests

  • Async: Use AsyncSession for async frameworks

  • Error Handling: Always handle NoResultFound and MultipleResultsFound

Common Patterns

Repository Pattern

from typing import Generic, TypeVar, Type from sqlalchemy.orm import Session

T = TypeVar('T', bound=Base)

class BaseRepository(Generic[T]): def init(self, model: Type[T], db: Session): self.model = model self.db = db

def get(self, id: int) -> Optional[T]:
    stmt = select(self.model).where(self.model.id == id)
    return self.db.execute(stmt).scalar_one_or_none()

def get_all(self, skip: int = 0, limit: int = 100) -> list[T]:
    stmt = select(self.model).offset(skip).limit(limit)
    return self.db.execute(stmt).scalars().all()

def create(self, obj: T) -> T:
    self.db.add(obj)
    self.db.commit()
    self.db.refresh(obj)
    return obj

def delete(self, id: int) -> bool:
    obj = self.get(id)
    if obj:
        self.db.delete(obj)
        self.db.commit()
        return True
    return False

Usage

user_repo = BaseRepository(User, db) user = user_repo.get(1)

Soft Delete Pattern

class SoftDeleteMixin: deleted_at: Mapped[Optional[datetime]] = mapped_column(default=None)

@property
def is_deleted(self) -> bool:
    return self.deleted_at is not None

class User(Base, SoftDeleteMixin): tablename = "users" # ... fields

Query only active records

stmt = select(User).where(User.deleted_at.is_(None))

Soft delete

user.deleted_at = datetime.utcnow() db.commit()

Audit Trail Pattern

class AuditMixin: created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now() ) created_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id")) updated_by: Mapped[Optional[int]] = mapped_column(ForeignKey("users.id"))

class Post(Base, AuditMixin): tablename = "posts" # ... fields

Resources

  • SQLAlchemy 2.0 Documentation

  • Alembic Documentation

  • FastAPI SQLAlchemy Guide

  • SQLAlchemy Type Annotations

Related Skills

When using Sqlalchemy, these skills enhance your workflow:

  • django: Django ORM patterns and migration strategies for comparison

  • test-driven-development: TDD patterns for database models and queries

  • fastapi-local-dev: FastAPI + SQLAlchemy integration patterns

  • systematic-debugging: Advanced debugging for ORM query issues and N+1 problems

[Full documentation available in these skills if deployed in your bundle]

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Web3

langchain-framework

No summary provided by upstream source.

Repository SourceNeeds Review
General

drizzle-orm

No summary provided by upstream source.

Repository SourceNeeds Review
General

pydantic

No summary provided by upstream source.

Repository SourceNeeds Review