Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .github/workflows/pip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ jobs:

- name: pypi-publish
env:
TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }}
TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }}
TWINE_USERNAME_TEST: ${{ secrets.TWINE_USERNAME_TEST }}
TWINE_PASSWORD_TEST: ${{ secrets.TWINE_PASSWORD_TEST }}
run: |
env | sort -f && cd src && ls -alh
sudo python3 -c "import fcntl; fcntl.fcntl(1, fcntl.F_SETFL, 0)"
sudo python3 setup.py sdist bdist_wheel
ls -alh ./dist

if [ "${GITHUB_REPOSITORY}" = "QPod/aloha-python" ] && [ "${GITHUB_REF_NAME}" = "main" ] ; then
twine upload dist/* --verbose -u "${TWINE_USERNAME}" -p "${TWINE_PASSWORD}" ;
elif [ ! -z "${TWINE_USERNAME_TEST}" ]; then
twine upload dist/* --verbose -u "${TWINE_USERNAME_TEST}" -p "${TWINE_PASSWORD_TEST}" \
--repository-url "https://test.pypi.org/legacy/" ;
URL_REPOSITORY="https://upload.pypi.org/legacy/"
else
URL_REPOSITORY="https://test.pypi.org/legacy/"
fi
twine upload dist/* --verbose -u "__token__" -p "${TWINE_PASSWORD_TEST}" --repository-url "${URL_REPOSITORY}" ;
122 changes: 122 additions & 0 deletions src/aloha/db/duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
__all__ = ('DuckOperator',)

from pathlib import Path

import duckdb
import duckdb_engine
from aloha.logger import LOG
from sqlalchemy import create_engine, text

LOG.debug('duckdb version = %s, duckdb_engine = %s ' % (duckdb.__version__, duckdb_engine.__version__))


class DuckOperator:
def __init__(self, db_config, **kwargs):
"""
db_config example:
{
"path": "/path/to/db.duckdb", # 数据库文件路径,使用 ":memory:" 表示内存模式
"schema": "sales", # 可选,默认 'main'
"read_only": True, # 可选,默认 False (内存模式下强制为 False)
"config": {"memory_limit": "500mb"}# 可选,DuckDB 连接配置
}
"""
self._config = {
'path': db_config.get('path', ':memory:'),
'schema': db_config.get('schema', 'main'),
'read_only': bool(db_config.get('read_only', False)),
'config': db_config.get('config', {}),
}

if not self._config['path'] or self._config['path'] == ':memory:': # 标准化内存模式路径
self._config['path'] = ':memory:'

if self._config['read_only']: # 内存数据库不支持只读模式
LOG.warning("In-memory database cannot be read-only. Setting read_only=False.")
self._config['read_only'] = False

else:
self._prepare_database()

try:
str_connection = f"duckdb:///{self._config['path']}"
self.conn = create_engine(
str_connection,
connect_args={
'read_only': self._config['read_only'],
'config': self._config['config']
},
**kwargs
).connect()

self._initialize_schema()

LOG.debug(
f"DuckDB connected: {self._config['path']} [schema={self._config['schema']}, read_only={self._config['read_only']}]"
)
except Exception as e:
LOG.exception(e)
raise RuntimeError('Failed to connect to DuckDB')

def _prepare_database(self):
"""准备数据库文件和目录"""
path = self._config['path']
path_obj = Path(path)

parent_dir = path_obj.parent
if not parent_dir.exists():
if self._config['read_only']:
raise RuntimeError(
f"Directory '{parent_dir}' does not exist and read_only=True"
)
try:
parent_dir.mkdir(parents=True, exist_ok=True)
LOG.debug(f"Created directory: {parent_dir}")
except Exception as e:
raise RuntimeError(f"Failed to create directory '{parent_dir}': {e}")

if not path_obj.exists():
if self._config['read_only']:
raise RuntimeError(
f"DuckDB file '{path}' does not exist and read_only=True"
)
try:
LOG.debug(f"Database file not found, creating: {path}")
duckdb.connect(path).close()
except Exception as e:
raise RuntimeError(f"Failed to create database file '{path}': {e}")

def _initialize_schema(self):
if self._config['schema'] == 'main':
return

try:
if self._config['read_only']:
result = self.conn.execute(
text("SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema"),
{'schema': self._config['schema']}
)
if not result.fetchone():
raise RuntimeError(
f"Schema '{self._config['schema']}' does not exist and read_only=True"
)
else:
self.conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {self._config['schema']}"))

self.conn.execute(text(f"SET schema '{self._config['schema']}'"))
except Exception as e:
raise RuntimeError(f'Failed to initialize schema: {e}')

@property
def connection(self):
return self.conn

def execute_query(self, sql, auto_commit: bool = True, *args, **kwargs):
cur = self.conn.execute(text(sql), *args, **kwargs)
if auto_commit:
self.conn.commit()
return cur

@property
def connection_str(self) -> str:
return f"duckdb:///{self._config['path']} [schema={self._config['schema']}, read_only={self._config['read_only']}]"
2 changes: 1 addition & 1 deletion src/aloha/db/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .base import PasswordVault
from ..logger import LOG

LOG.debug('postgres: psycopg2 version = %s' % psycopg.__version__)
LOG.debug('postgres: psycopg version = %s' % psycopg.__version__)


class PostgresOperator:
Expand Down
3 changes: 2 additions & 1 deletion src/aloha/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
from ..settings import SETTINGS

LOG = get_logger(
level=SETTINGS.config.get('deploy', {}).get('log_level', 10) # 10 = logging.DEBUG
level=SETTINGS.config.get('deploy', {}).get('log_level', 10), # 10 = logging.DEBUG
# logger_name='default',
)
9 changes: 6 additions & 3 deletions src/aloha/logger/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def setup_logger(logger: logging.Logger, level: int = logging.DEBUG, logger_name
from ..settings import SETTINGS
module = SETTINGS.config.get('APP_MODULE') or os.environ.get('APP_MODULE', 'default')

if logger_name is not None and len(logger_name) > 0:
module = '%s_%s' % (logger_name, module)
# if logger_name is not None and len(logger_name) > 0:
# module = '%s_%s' % (logger_name, module)

path_file = [module, socket.gethostname(), 'p%s' % os.getpid()] # module, hostname, pid
path_file = '_'.join(str(i) for i in path_file if i is not None)
Expand All @@ -34,7 +34,10 @@ def setup_logger(logger: logging.Logger, level: int = logging.DEBUG, logger_name
logger.setLevel(level)


def get_logger(level=logging.DEBUG, logger_name: str = None, *args, **kwargs) -> logging.Logger:
def get_logger(level=logging.DEBUG, logger_name: str | None = None, *args, **kwargs) -> logging.Logger:
if logger_name is None:
logger_name = 'default'

logger = logging.getLogger(logger_name)

if isinstance(level, str):
Expand Down
16 changes: 12 additions & 4 deletions src/aloha/script/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,19 @@ def build(base: str = None, dist: str = 'build', exclude: list = None, keep: lis
for dir_path, dir_names, file_names in os.walk(path_base):
dir_name = dir_path.split(os.sep)[-1] # name of the current directory

if dir_path.startswith(path_build) or dir_path in files_exclude:
continue # skip: folder for build output, and excluded folders

flag_skip: bool = False
if dir_name.startswith('.') or (os.sep + '.' in dir_path):
continue # hidden folders and sub-folders
flag_skip = True # hidden folders and sub-folders
elif dir_path.startswith(path_build):
flag_skip = True # skip: folder for build output, and excluded folders
else:
for f in files_exclude:
if dir_path.startswith(f):
flag_skip = True
break

if flag_skip:
continue

for file in file_names:
(name, extension), path = os.path.splitext(file), os.path.join(dir_path, file)
Expand Down