Alembic Database Migrations
Alembic is a database migration tool for SQLAlchemy projects that provides version control for your database schema.
Quick Start
Create Migration (Autogenerate)
Generate migration from model changes
alembic revision --autogenerate -m "Add user table"
Check if there are pending changes
alembic check
Apply Migrations
Upgrade to latest version
alembic upgrade head
Upgrade to specific revision
alembic upgrade ae1027a6acf
Downgrade one revision
alembic downgrade -1
Downgrade to base (empty schema)
alembic downgrade base
Check Status
Show current database revision
alembic current
Show all revision history
alembic history
Show revision details
alembic show ae1027a6acf
Common Patterns
Autogenerate Configuration
env.py setup for async SQLAlchemy:
import asyncio from logging.config import fileConfig from sqlalchemy import pool from sqlalchemy.ext.asyncio import async_engine_from_config from alembic import context
Import your models
from app.models import Base from app.config import get_settings
config = context.config settings = get_settings()
Configure database URL for async
database_url = settings.database_url.replace("postgresql://", "postgresql+asyncpg://") config.set_main_option("sqlalchemy.url", database_url)
target_metadata = Base.metadata
async def run_async_migrations(): connectable = async_engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, )
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection): context.configure( connection=connection, target_metadata=target_metadata, compare_type=True, compare_server_default=True, render_as_batch=False, # Set to True for SQLite )
with context.begin_transaction():
context.run_migrations()
def run_migrations_online(): asyncio.run(run_async_migrations())
if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()
Manual Migration Operations
Common schema changes:
from alembic import op import sqlalchemy as sa
def upgrade(): # Add column op.add_column('users', sa.Column('email', sa.String(255), nullable=True))
# Rename table
op.rename_table('old_table', 'new_table')
# Create index
op.create_index('ix_users_email', 'users', ['email'])
# Add constraint
op.create_check_constraint('ck_age_positive', 'users', 'age > 0')
def downgrade(): # Reverse operations op.drop_constraint('ck_age_positive', 'users') op.drop_index('ix_users_email') op.rename_table('new_table', 'old_table') op.drop_column('users', 'email')
Batch Mode (for SQLite)
Configure batch mode in env.py:
context.configure( connection=connection, target_metadata=target_metadata, render_as_batch=True # Required for SQLite migrations )
Generated batch migration:
def upgrade(): with op.batch_alter_table('users', schema=None) as batch_op: batch_op.add_column(sa.Column('email', sa.String(length=255), nullable=True)) batch_op.create_index('ix_users_email', ['email'], unique=False)
Filtering Objects
Skip certain objects in autogenerate:
def include_object(object, name, type_, reflected, compare_to): # Skip temporary tables if type_ == "table" and name.startswith("temp_"): return False
# Skip columns with skip_autogenerate flag
if type_ == "column" and not reflected:
if object.info.get("skip_autogenerate", False):
return False
return True
context.configure( connection=connection, target_metadata=target_metadata, include_object=include_object )
Filter by schema:
def include_name(name, type_, parent_names): if type_ == "schema": return name in [None, "public", "auth"] # Include default + specific schemas elif type_ == "table": return parent_names["schema_qualified_table_name"] in target_metadata.tables return True
context.configure( connection=connection, target_metadata=target_metadata, include_name=include_name, include_schemas=True )
Custom Migration Processing
Modify generated migrations:
def process_revision_directives(context, revision, directives): script = directives[0]
# Skip empty migrations
if config.cmd_opts.autogenerate and script.upgrade_ops.is_empty():
directives[:] = []
return
# Remove downgrade operations for one-way migrations
script.downgrade_ops.ops[:] = []
context.configure( connection=connection, target_metadata=target_metadata, process_revision_directives=process_revision_directives )
Data Migrations
Migrate data during schema change:
def upgrade(): # Add new column op.add_column('users', sa.Column('full_name', sa.String(255), nullable=True))
# Migrate data
connection = op.get_bind()
connection.execute(
sa.text("UPDATE users SET full_name = first_name || ' ' || last_name")
)
# Make column required after data migration
op.alter_column('users', 'full_name', nullable=False)
def downgrade(): op.drop_column('users', 'full_name')
Branch Migrations
Work with multiple branches:
Create branch
alembic revision -m "Create feature branch" --head=base --branch-label=feature_x
Upgrade specific branch
alembic upgrade feature_x@head
Merge branches
alembic merge -m "Merge feature_x into main" feature_x@head main@head
Practical Code Snippets
Check if Database is Up-to-Date
from alembic import config, script from alembic.runtime import migration from sqlalchemy import create_engine
def is_database_up_to_date(alembic_cfg_path, database_url): """Check if database schema matches latest migrations""" cfg = config.Config(alembic_cfg_path) directory = script.ScriptDirectory.from_config(cfg)
engine = create_engine(database_url)
with engine.begin() as connection:
context = migration.MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
latest_heads = set(directory.get_heads())
return current_heads == latest_heads
Programmatically Run Migrations
from alembic import command from alembic.config import Config
def run_migrations(alembic_ini_path): """Run all pending migrations""" alembic_cfg = Config(alembic_ini_path) command.upgrade(alembic_cfg, "head")
def create_migration(alembic_ini_path, message, autogenerate=True): """Create new migration""" alembic_cfg = Config(alembic_ini_path) command.revision(alembic_cfg, message=message, autogenerate=autogenerate)
Custom Migration Operations
from alembic.autogenerate import rewriter from alembic.operations import ops
writer = rewriter.Rewriter()
@writer.rewrites(ops.AddColumnOp) def add_column_non_nullable(context, revision, op): """Add non-nullable columns in two steps""" if not op.column.nullable: op.column.nullable = True return [ op, ops.AlterColumnOp( op.table_name, op.column.name, nullable=False, existing_type=op.column.type, schema=op.schema ) ] return op
Use in env.py
context.configure( connection=connection, target_metadata=target_metadata, process_revision_directives=writer )
Requirements
-
Python 3.8+: Required for async support
-
SQLAlchemy 2.0+: For modern async patterns
-
PostgreSQL/MySQL/SQLite: Supported databases
-
Alembic 1.8+: Migration tooling
Common Dependencies
Core dependencies
pip install alembic sqlalchemy
For PostgreSQL with async
pip install asyncpg
For MySQL with async
pip install aiomysql
For SQLite (built-in)
No additional packages needed
Development Setup
Initialize Alembic in existing project
alembic init alembic
Configure env.py for your models
Edit alembic.ini for database URL
First migration
alembic revision --autogenerate -m "Initial schema" alembic upgrade head