Try to use migrations in tests

This commit is contained in:
stijndcl 2022-07-19 18:49:22 +02:00
parent f4056d8af6
commit 9401111bee
5 changed files with 105 additions and 84 deletions

View file

@ -7,15 +7,17 @@ from sqlalchemy.orm import sessionmaker
import settings
encoded_password = quote_plus(settings.DB_PASSWORD)
url = URL.create(
drivername="postgresql+asyncpg",
username=settings.DB_USERNAME,
password=encoded_password,
host=settings.DB_HOST,
port=settings.DB_PORT,
database=settings.DB_NAME,
)
engine = create_async_engine(
URL.create(
drivername="postgresql+asyncpg",
username=settings.DB_USERNAME,
password=encoded_password,
host=settings.DB_HOST,
port=settings.DB_PORT,
database=settings.DB_NAME,
),
url,
pool_pre_ping=True,
future=True,
)

View file

@ -1,16 +1,27 @@
import logging
from alembic import config, script
from alembic.runtime import migration
from database.engine import engine
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Session
__all__ = ["ensure_latest_migration"]
from alembic import command, config, script
from alembic.config import Config
from alembic.runtime import migration
from database.engine import engine, url
__config_path__ = "alembic.ini"
__migrations_path__ = "alembic/"
cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migrations_path__)
__all__ = ["ensure_latest_migration", "migrate"]
async def ensure_latest_migration():
"""Make sure we are currently on the latest revision, otherwise raise an exception"""
alembic_config = config.Config("alembic.ini")
alembic_script = script.ScriptDirectory.from_config(alembic_config)
alembic_script = script.ScriptDirectory.from_config(cfg)
async with engine.begin() as connection:
current_revision = await connection.run_sync(
@ -25,3 +36,20 @@ async def ensure_latest_migration():
)
logging.error(error_message)
raise RuntimeError(error_message)
def __execute_upgrade(connection: Session):
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")
def __execute_downgrade(connection: Session):
cfg.attributes["connection"] = connection
command.downgrade(cfg, "base")
async def migrate(up: bool):
"""Migrate the database upwards or downwards"""
async_engine = create_async_engine(url, echo=True)
async with async_engine.begin() as connection:
await connection.run_sync(__execute_upgrade if up else __execute_downgrade)