Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ Component Breakdown
initialization so that when a decorator function is called, it can setup a connection pool to a correct database
* **is_set_current_database_supported** - this function may be used to determine if the ``*_current_database`` methods
may be used or not
* **set_current_database** - (only supported on Python 3.7+) this function may be used to set the database name for the
current async context (not thread), this is especially useful for multitenant applications
* **reset_current_database** - (only supported on Python 3.7+) helper method to reset the current database after
``set_current_database`` has been used in an async context
* **set_current_database** - this function may be used to set the database name for the current async context
(not thread), this is especially useful for multitenant applications
* **reset_current_database** - helper method to reset the current database after ``set_current_database`` has
been used in an async context
* **set_database_init_hook** - sets a method to call whenever a new database is initialized
* **QueryData** - a class that may be returned or yielded from ``sql*`` decorated methods which
contains query information
Expand Down Expand Up @@ -99,28 +99,46 @@ The ``set_database_init_hook`` method may be used in this case. As an example, t
Multitenancy
============
In some applications, it may be useful to set a database other than the default database in order to support
database-per-tenant configurations. This may be done using the ``set_current_database`` and ``reset_current_database``
methods.
database-per-tenant configurations. This may be done using various provided methods.

.. code-block:: python

from dysql import reset_current_database, set_current_database

def use_database_for_query():
set_database_parameters(
from dysql import (
set_default_connection_parameters,
reset_current_database,
set_current_database,
use_database_tenant,
tenant_database_manager,
sqlquery,
QueryData,
)

def init():
# Initialize all databases up-front using an arbitrary database key to refer to them later
set_default_connection_parameters(
...
database_key='db1',
)
set_default_connection_parameters(
...
'db1',
database_key='db2',
)

def tenant_query_with_manual_set_reset():
set_current_database('db2')
try:
# Queries db2 and not db1
query_database()
finally:
reset_current_database()

.. warning::
These methods are only supported in Python 3.7+ due to their use of the ``contextvars`` module. The
``is_set_current_database_supported`` method is provided to help tell if these methods may be used.
def tenant_query_with_context_manager():
with tenant_database_manager("db2"):
return query_database()

@use_database_tenant("db2")
@sqlquery()
def tenant_query_with_decorator():
return QueryData("SELECT * FROM users")

Decorators
==========
Expand Down
3 changes: 3 additions & 0 deletions dysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
set_database_init_hook,
set_default_connection_parameters,
)
from .multitenancy import use_database_tenant, tenant_database_manager
from .exceptions import DBNotPreparedError


Expand All @@ -53,5 +54,7 @@
"set_current_database",
"set_database_init_hook",
"set_default_connection_parameters",
"use_database_tenant",
"tenant_database_manager",
"DBNotPreparedError",
]
97 changes: 51 additions & 46 deletions dysql/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
with the terms of the Adobe license agreement accompanying it.
"""

import contextvars
import logging
import sys
from collections import defaultdict
from typing import Callable, Optional

import sqlalchemy
Expand All @@ -17,14 +18,8 @@

logger = logging.getLogger("database")

_DEFAULT_CONNECTION_PARAMS = {}

try:
import contextvars

CURRENT_DATABASE_VAR = contextvars.ContextVar("dysql_current_database", default="")
except ImportError:
CURRENT_DATABASE_VAR = None
_DEFAULT_CONNECTION_PARAMS_BY_KEY = defaultdict(dict)
CURRENT_DATABASE_VAR = contextvars.ContextVar("dysql_current_database", default="")


def set_database_init_hook(
Expand All @@ -41,24 +36,20 @@ def set_database_init_hook(

def is_set_current_database_supported() -> bool:
"""
Determines if the set_current_database method is available on this python runtime.
:return: True if available, False otherwise
Deprecated, left in for backwards compatibility but always returns true.
:return: True
"""
return bool(CURRENT_DATABASE_VAR)
return True


def set_current_database(database: str) -> None:
def set_current_database(database_key: str) -> None:
"""
Sets the current database, may be used for multitenancy. This is only supported on Python 3.7+. This uses
Sets the current database key, may be used for multitenancy. This is only supported on Python 3.7+. This uses
contextvars internally to set the name for the current async context.
:param database: the database name to use for this async context
:param database_key: the arbitrary database key to use for this async context
"""
if not CURRENT_DATABASE_VAR:
raise DBNotPreparedError(
f'Cannot set the current database on Python "{sys.version}", please upgrade your Python version'
)
CURRENT_DATABASE_VAR.set(database)
logger.debug(f"Set current database to {database}")
CURRENT_DATABASE_VAR.set(database_key)
logger.debug(f"Set current database to {database_key}")


def reset_current_database() -> None:
Expand All @@ -69,16 +60,17 @@ def reset_current_database() -> None:
set_current_database("")


def _get_current_database() -> str:
def _get_current_database_key() -> str:
"""
The current database name, using contextvars (if on python 3.7+) or the default database name.
:return: The current database name
The current database key, using contextvars (if on python 3.7+) or the default database key.
:return: The current database key
"""
database: Optional[str] = None
if CURRENT_DATABASE_VAR:
database = CURRENT_DATABASE_VAR.get()
if not database:
database = _DEFAULT_CONNECTION_PARAMS.get("database")
if not database and _DEFAULT_CONNECTION_PARAMS_BY_KEY:
# Get first database key
database = next(iter(_DEFAULT_CONNECTION_PARAMS_BY_KEY))
return database


Expand All @@ -94,12 +86,14 @@ def set_default_connection_parameters(
user: str,
password: str,
database: str,
database_key: Optional[str] = None,
port: int = 3306,
pool_size: int = 10,
pool_recycle: int = 3600,
echo_queries: bool = False,
charset: str = "utf8",
): # pylint: disable=too-many-arguments,unused-argument
collation: Optional[str] = None,
):
"""
Initializes the parameters to use when connecting to the database. This is a subset of the parameters
used by sqlalchemy. These may be overridden by parameters provided in the QueryData, hence the "default".
Expand All @@ -108,27 +102,29 @@ def set_default_connection_parameters(
:param user: user to connect to the database with
:param password: password for given user
:param database: database to connect to
:param database_key: optional database key that may be used for multitenant DBs, defaults to the database name
:param port: the port to connect to (default 3306)
:param pool_size: number of connections to maintain in the connection pool (default 10)
:param pool_recycle: amount of time to wait between resetting the connections
in the pool (default 3600)
:param echo_queries: this tells sqlalchemy to print the queries when set to True (default false)
:param charset: the charset for the sql engine to initialize with. (default utf8)
:param collation: the collation for the sql engine to initialize with. (default is not set)
:exception DBNotPrepareError: happens when required parameters are missing
"""
_validate_param("host", host)
_validate_param("user", user)
_validate_param("password", password)
_validate_param("database", database)

_DEFAULT_CONNECTION_PARAMS.update(locals())
if not database_key:
database_key = database
_DEFAULT_CONNECTION_PARAMS_BY_KEY[database_key].update(locals())


class Database:
# pylint: disable=too-few-public-methods

def __init__(self, database: Optional[str]) -> None:
self.database = database
def __init__(self, database_key: Optional[str]) -> None:
self.database = database_key
# Engine is lazy-initialized
self._engine: Optional[sqlalchemy.engine.Engine] = None

Expand All @@ -142,25 +138,35 @@ def set_init_hook(
@property
def engine(self) -> sqlalchemy.engine.Engine:
if not self._engine:
user = _DEFAULT_CONNECTION_PARAMS.get("user")
password = _DEFAULT_CONNECTION_PARAMS.get("password")
host = _DEFAULT_CONNECTION_PARAMS.get("host")
port = _DEFAULT_CONNECTION_PARAMS.get("port")
charset = _DEFAULT_CONNECTION_PARAMS.get("charset")

url = f"mysql+mysqlconnector://{user}:{password}@{host}:{port}/{self.database}?charset={charset}"
connection_params = _DEFAULT_CONNECTION_PARAMS_BY_KEY.get(self.database, {})
if not connection_params:
raise DBNotPreparedError(
f"No connection parameters found for database key '{self.database}'"
)
user = connection_params.get("user")
password = connection_params.get("password")
database = connection_params.get("database")
host = connection_params.get("host")
port = connection_params.get("port")
charset = connection_params.get("charset")
collation = connection_params.get("collation")
collation_str = ""
if collation:
collation_str = f"&collation={collation}"

url = f"mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}?charset={charset}{collation_str}"
self._engine = sqlalchemy.create_engine(
url,
pool_recycle=_DEFAULT_CONNECTION_PARAMS.get("pool_recycle"),
pool_size=_DEFAULT_CONNECTION_PARAMS.get("pool_size"),
echo=_DEFAULT_CONNECTION_PARAMS.get("echo_queries"),
pool_recycle=connection_params.get("pool_recycle"),
pool_size=connection_params.get("pool_size"),
echo=connection_params.get("echo_queries"),
pool_pre_ping=True,
)
hook_method: Optional[
Callable[[Optional[str], sqlalchemy.engine.Engine], None]
] = getattr(self.__class__, "hook_method", None)
if hook_method:
hook_method(self.database, self._engine)
hook_method(database, self._engine)

return self._engine

Expand All @@ -178,7 +184,7 @@ def __getitem__(self, database: Optional[str]) -> Database:
:return: a database instance
:raises DBNotPreparedError: when set_default_connection_parameters has not yet been called
"""
if not _DEFAULT_CONNECTION_PARAMS:
if not _DEFAULT_CONNECTION_PARAMS_BY_KEY:
raise DBNotPreparedError(
"Unable to connect to a database, set_default_connection_parameters must first be called"
)
Expand All @@ -192,8 +198,7 @@ def current_database(self) -> Database:
"""
The current database instance, retrieved using contextvars (if python 3.7+) or the default database.
"""
# pylint: disable=unnecessary-dunder-call
return self.__getitem__(_get_current_database())
return self.__getitem__(_get_current_database_key())


class DatabaseContainerSingleton(DatabaseContainer):
Expand Down
69 changes: 69 additions & 0 deletions dysql/multitenancy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Copyright 2025 Adobe
All Rights Reserved.

NOTICE: Adobe permits you to use, modify, and distribute this file in accordance
with the terms of the Adobe license agreement accompanying it.
"""

import functools
import logging
from typing import Any, Callable, TypeVar
from contextlib import contextmanager

from dysql.exceptions import DBNotPreparedError
from dysql import (
set_current_database,
reset_current_database,
)

LOGGER = logging.getLogger(__name__)

F = TypeVar("F", bound=Callable[..., Any])


@contextmanager
def tenant_database_manager(database_key: str):
"""
Context manager for temporarily switching to a different database.

:param database_key: the database key to switch to
:raises DBNotPreparedError: if the database key is not set
"""
if not database_key:
raise DBNotPreparedError(
"Cannot switch to database tenant with empty database key"
)

try:
LOGGER.debug(f"Switching to database {database_key}")
set_current_database(database_key)
yield
except Exception as e:
LOGGER.error(f"Error while using database {database_key}: {e}")
raise
finally:
try:
reset_current_database()
LOGGER.debug(f"Reset database context from: {database_key}")
except Exception as e:
LOGGER.error(f"Error resetting database context: {e}")
# Don't re-raise here to avoid masking the original exception


def use_database_tenant(database_key: str):
"""
Decorator that switches to a specific database for the duration of the function call.
:param database_key: the database key to use
:return: the decorator function
"""

def decorator(func: F) -> F:
@functools.wraps(func)
def wrapper(*args, **kwargs):
with tenant_database_manager(database_key):
return func(*args, **kwargs)

return wrapper

return decorator
Loading