From 688a08eb92785f3876daccfcfe778e4739f6acc9 Mon Sep 17 00:00:00 2001 From: Bibo Hao Date: Wed, 24 Dec 2025 08:51:20 +0000 Subject: [PATCH 1/5] init oracledb --- src/aloha/db/oracle.py | 84 ++++++++++++++++++++ tool/{app.Dockerfile => app-demo.Dockerfile} | 2 +- tool/cicd/docker-compose.app-demo.yml | 28 +++++++ tool/cicd/run-dev.sh | 84 ++++++++++++++++++++ tool/run_config-aliyun.sh | 47 ----------- tool/run_config-common.sh | 46 ----------- tool/run_config-tsinghua.sh | 47 ----------- 7 files changed, 197 insertions(+), 141 deletions(-) create mode 100644 src/aloha/db/oracle.py rename tool/{app.Dockerfile => app-demo.Dockerfile} (97%) create mode 100644 tool/cicd/docker-compose.app-demo.yml create mode 100644 tool/cicd/run-dev.sh delete mode 100644 tool/run_config-aliyun.sh delete mode 100644 tool/run_config-common.sh delete mode 100644 tool/run_config-tsinghua.sh diff --git a/src/aloha/db/oracle.py b/src/aloha/db/oracle.py new file mode 100644 index 0000000..9c99b29 --- /dev/null +++ b/src/aloha/db/oracle.py @@ -0,0 +1,84 @@ +__all__ = ('OracledbOperator',) + +import oracledb +from sqlalchemy import create_engine +from sqlalchemy.sql import text + + +from .base import PasswordVault +from ..logger import LOG + +LOG.debug('Version of oracledb = %s' % oracledb.__version__) + + +class OracledbOperator: + def __init__(self, db_config, **kwargs): + """example of db_config: + { + "host": "192.168.1.100", + "port": 1521, + "user": "PT_INDEX", + "password": "vault_key_or_plain", + "service_name": "orcl", # 推荐使用 service_name + "sid": "orcl", # 或使用 sid + "vault_type": "...", + "vault_config": {...}, + "lib_dir": "/opt/oracle/instantclient" # optional, use THICK mode if defined. + } + """ + + self._config = { + 'host': db_config['host'], + 'port': db_config['port'], + 'user': db_config['user'], + 'password': db_config['password'], + } + + if 'lib_dir' in db_config: # use Thick mode + try: + oracledb.init_oracle_client(lib_dir=db_config['lib_dir']) + print("Oracle client initialized in THICK mode from: %s" % db_config['lib_dir']) + except Exception as e: + print(f"Warning: {e}") + raise RuntimeError(f"Failed to initialize Oracle client: {e}") + + print("is_thin =", oracledb.is_thin_mode()) + + service_name = db_config.get("service_name") + sid = db_config.get("sid") + + if service_name: # using service_name (recommended) + dsn = oracledb.makedsn(db_config['host'], db_config['port'], service_name=service_name) + elif sid: # using SID + dsn = oracledb.makedsn(db_config['host'], db_config['port'], sid=sid) + else: + raise ValueError("Oracle config must specify service_name or sid") + + self._config["dsn"] = dsn + try: + self.engine = create_engine( + "oracle+oracledb://{user}:{password}@".format(**self._config), + connect_args={"dsn": dsn}, + pool_size=20, + max_overflow=10, + pool_pre_ping=True, + **kwargs + ) + msg = "OracleDB connected: {host}:{port}".format(**self._config) + print(msg) + except Exception as e: + raise RuntimeError(f"Failed to connect to OracleDB: {e}") + + @property + def connection(self): + return self.engine + + def execute_query(self, sql, *args, **kwargs): + """returns SQLAlchemy CursorResult""" + with self.engine.connect() as conn: + cur = conn.execute(text(sql), *args, **kwargs) + return cur + + @property + def connection_str(self) -> str: + return "oracle://{user}@{host}:{port}".format(**self._config) diff --git a/tool/app.Dockerfile b/tool/app-demo.Dockerfile similarity index 97% rename from tool/app.Dockerfile rename to tool/app-demo.Dockerfile index e46b5bb..8fe460b 100644 --- a/tool/app.Dockerfile +++ b/tool/app-demo.Dockerfile @@ -1,5 +1,5 @@ ARG BASE_NAMESPACE="docker.io" -ARG BASE_IMG="qpod/base:latest" +ARG BASE_IMG="labnow/base:latest" ARG DIR_APP="/root/app" ARG MIRROR_PROFILE="common" diff --git a/tool/cicd/docker-compose.app-demo.yml b/tool/cicd/docker-compose.app-demo.yml new file mode 100644 index 0000000..e346aca --- /dev/null +++ b/tool/cicd/docker-compose.app-demo.yml @@ -0,0 +1,28 @@ +name: ${PROJECT_NAME:-DEV-app-demo} + +services: + dev-app-demo: + container_name: ${CONTAINER_NAME:-dev-app-demo} + hostname: ${CONTAINER_NAME:-dev-app-demo} + build: + context: ../../ + dockerfile: src/dev-app-demo.Dockerfile + args: + - ENABLE_CODE_BUILD=false + - PROFILE_LOCALIZE=${PROFILE_LOCALIZE:-aliyun-pub} + pull: true + restart: unless-stopped + environment: + - PYTHONUNBUFFERED=1 + - PYTHONDONTWRITEBYTECODE=1 + - PROFILE_LOCALIZE=${PROFILE_LOCALIZE:-aliyun-pub} + - ENV_PROFILE=${ENV_PROFILE:-DEV} + # env_file: ["../credentials/DEV-app-demo.env"] + user: "0:0" + ports: ["${PORT_APP:-40000}:9000", "${PORT_WEB:-43000}:3000"] + volumes: + - ../../doc:/root/doc:rw + - ../../notebook:/root/notebook:rw + - ../../src:/root/src:rw + working_dir: /root/src + command: "tail -f /dev/null" diff --git a/tool/cicd/run-dev.sh b/tool/cicd/run-dev.sh new file mode 100644 index 0000000..3d9020f --- /dev/null +++ b/tool/cicd/run-dev.sh @@ -0,0 +1,84 @@ +#!/bin/bash +set -euo pipefail + +ACTION="${1:-up}" + +USERNAME="$(whoami)" +USERID="$(id -u)" + + +BASE_APP_PORT=40000 +BASE_WEB_PORT=43000 +export PORT_APP=$((BASE_APP_PORT + USERID)) +export PORT_WEB=$((BASE_WEB_PORT + USERID)) +export PROJECT_NAME="dev-app-demo-${USERNAME}" +export CONTAINER_NAME="dev-app-demo-${USERNAME}" + + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +shopt -s nullglob +FILES=("$SCRIPT_DIR"/docker-compose.*.DEV.yml) +if [ ${#FILES[@]} -eq 0 ]; then + echo "❌ No docker-compose.*.DEV.yml found in $SCRIPT_DIR" + exit 1 +fi +COMPOSE="${FILES[0]}" + +# ---------- Port checking function ---------- +check_port() { + local port=$1 + if ss -lnt | awk '{print $4}' | grep -q ":$port$"; then + echo "❌ ERROR: Port $port is already in use. Aborting." + exit 1 + fi +} + +require_ports_free() { + echo "🔍 Checking port availability..." + check_port "$PORT_APP" + check_port "$PORT_WEB" + echo "✅ Ports available." +} + +# ---------- Action Dispatcher ---------- +echo "----------------------------------------" +echo "User: $USERNAME (UID: $USERID)" +echo "Project Name: $PROJECT_NAME" +echo "Container: $CONTAINER_NAME" +echo "App Port Expose: $PORT_APP" +echo "Web Port Expose: $PORT_WEB" +echo "Action: $ACTION" +echo "Compose: $COMPOSE" +echo "----------------------------------------" + +docker network create net-ailab-proxy || docker network create net-db-common || true ; + +case "$ACTION" in + up) + require_ports_free + docker-compose -f "$COMPOSE" -p "$PROJECT_NAME" up -d + echo "✅ Development environment launched." + ;; + restart) + require_ports_free + docker-compose -f "$COMPOSE" -p "$PROJECT_NAME" restart + echo "🔁 Restart completed." + ;; + down) + docker-compose -f "$COMPOSE" -p "$PROJECT_NAME" down + echo "🛑 Environment stopped." + ;; + enter) + echo "↩️ Entering into development container:" + docker exec -it $CONTAINER_NAME bash + ;; + logs) + echo "🕵 Attach to container for log inspection:" + docker-compose -f "$COMPOSE" -p "$PROJECT_NAME" logs -f + ;; + *) + echo "Usage: $0 [up|down|restart|logs]" + exit 1 + ;; +esac diff --git a/tool/run_config-aliyun.sh b/tool/run_config-aliyun.sh deleted file mode 100644 index 14d5556..0000000 --- a/tool/run_config-aliyun.sh +++ /dev/null @@ -1,47 +0,0 @@ -#! /bin/sh -set -ex - -export TZ=${TZ:="Asia/Shanghai"} -ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ >/etc/timezone -echo "Setup timezone, current date: $(date)" - -if [ -f /etc/apt/sources.list ]; then - echo "Found Ubuntu system, setting ubuntu mirror" - sed -i 's/mirrors.*.com/mirrors.aliyun.com/' /etc/apt/sources.list - sed -i 's/archive.ubuntu.com/mirrors.aliyun.com/' /etc/apt/sources.list - sed -i 's/security.ubuntu.com/mirrors.aliyun.com/' /etc/apt/sources.list -fi - -if [ -f /etc/yum.repos.d/CentOS-Base.repo ]; then - echo "Found CentOS system, setting CentOS mirror" - sed -i 's/mirror.centos.org/mirrors.aliyun.com/' /etc/yum.repos.d/CentOS-Base.repo - sed -i 's/mirrorlist=/#mirrorlist=/' /etc/yum.repos.d/CentOS-Base.repo - sed -i 's/#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo -fi - -if [ -f "$(which python)" ]; then - echo "Found python, setting pypi source in /etc/pip.conf" - cat >/etc/pip.conf <>/etc/login.defs diff --git a/tool/run_config-common.sh b/tool/run_config-common.sh deleted file mode 100644 index fcd190a..0000000 --- a/tool/run_config-common.sh +++ /dev/null @@ -1,46 +0,0 @@ -#! /bin/sh -set -ex - -export TZ=${TZ:="Asia/Shanghai"} -ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ >/etc/timezone -echo "Setup timezone, current date: $(date)" - -if [ -f /etc/apt/sources.list ]; then - echo "Found Ubuntu system, setting ubuntu mirror" - # sed -i 's/mirrors.*.com/mirrors.*.com.cn/' /etc/apt/sources.list - # sed -i 's/archive.ubuntu.com/mirrors.*.com.cn/' /etc/apt/sources.list - # sed -i 's/security.ubuntu.com/mirrors.*.com.cn/' /etc/apt/sources.list -fi - -if [ -f /etc/yum.repos.d/CentOS-Base.repo ]; then - echo "Found CentOS system, setting CentOS mirror" - # sed -i 's/mirror.centos.org/mirrors.*.com.cn/' /etc/yum.repos.d/CentOS-Base.repo - # sed -i 's/mirrorlist=/#mirrorlist=/' /etc/yum.repos.d/CentOS-Base.repo - # sed -i 's/#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo -fi - -if [ -f "$(which python)" ]; then - echo "Found python, setting pypi source in /etc/pip.conf" - cat >/etc/pip.conf <>/etc/login.defs diff --git a/tool/run_config-tsinghua.sh b/tool/run_config-tsinghua.sh deleted file mode 100644 index 356a72f..0000000 --- a/tool/run_config-tsinghua.sh +++ /dev/null @@ -1,47 +0,0 @@ -#! /bin/sh -set -ex - -export TZ=${TZ:="Asia/Shanghai"} -ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ >/etc/timezone -echo "Setup timezone, current date: $(date)" - -if [ -f /etc/apt/sources.list ]; then - echo "Found Ubuntu system, setting ubuntu mirror" - sed -i 's/mirrors.*.com/mirrors.tuna.tsinghua.edu.cn/' /etc/apt/sources.list - sed -i 's/archive.ubuntu.com/mirrors.tuna.tsinghua.edu.cn/' /etc/apt/sources.list - sed -i 's/security.ubuntu.com/mirrors.tuna.tsinghua.edu.cn/' /etc/apt/sources.list -fi - -if [ -f /etc/yum.repos.d/CentOS-Base.repo ]; then - echo "Found CentOS system, setting CentOS mirror" - sed -i 's/mirror.centos.org/mirrors.tuna.tsinghua.edu.cn/' /etc/yum.repos.d/CentOS-Base.repo - sed -i 's/mirrorlist=/#mirrorlist=/' /etc/yum.repos.d/CentOS-Base.repo - sed -i 's/#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo -fi - -if [ -f "$(which python)" ]; then - echo "Found python, setting pypi source in /etc/pip.conf" - cat >/etc/pip.conf <>/etc/login.defs From 09a7b1782dc30e9606e1630e2977a909c3a15e29 Mon Sep 17 00:00:00 2001 From: Bibo Hao Date: Wed, 24 Dec 2025 09:33:25 +0000 Subject: [PATCH 2/5] update cicd --- tool/app-demo.Dockerfile | 18 ++++++++------ ...mo.yml => docker-compose.app-demo.DEV.yml} | 6 ++--- tool/cicd/run-dev.sh | 0 tool/dev-demo.Dockerfile | 24 +++++++++++++++++++ 4 files changed, 38 insertions(+), 10 deletions(-) rename tool/cicd/{docker-compose.app-demo.yml => docker-compose.app-demo.DEV.yml} (81%) mode change 100644 => 100755 tool/cicd/run-dev.sh create mode 100644 tool/dev-demo.Dockerfile diff --git a/tool/app-demo.Dockerfile b/tool/app-demo.Dockerfile index 8fe460b..5b3d805 100644 --- a/tool/app-demo.Dockerfile +++ b/tool/app-demo.Dockerfile @@ -1,7 +1,9 @@ -ARG BASE_NAMESPACE="docker.io" +ARG BASE_NAMESPACE="quay.io" ARG BASE_IMG="labnow/base:latest" ARG DIR_APP="/root/app" -ARG MIRROR_PROFILE="common" + +# this var PROFILE_LOCALIZE will be used in /opt/utils/script-localize.sh +ARG PROFILE_LOCALIZE="aliyun-pub" # Stage 1: compile the code FROM ${BASE_NAMESPACE:+$BASE_NAMESPACE/}${BASE_IMG} AS builder @@ -9,23 +11,24 @@ FROM ${BASE_NAMESPACE:+$BASE_NAMESPACE/}${BASE_IMG} AS builder ARG ENABLE_CODE_BUILD="true" ARG DIR_APP -ARG MIRROR_PROFILE +ARG PROFILE_LOCALIZE + COPY . /tmp/app RUN set -ex && cd /tmp/app && mkdir -pv ${DIR_APP} \ - && sh ./tool/run_config-${MIRROR_PROFILE}.sh \ + && source /opt/utils/script-localize.sh ${PROFILE_LOCALIZE} \ && if [[ "$ENABLE_CODE_BUILD" = "true" ]] ; then \ echo "-> Building src to binary..." && pip install -U aloha[build] && \ aloha compile --base=./src --dist=${DIR_APP}/ ; \ else \ echo "-> Not building src code!" && mv src/* ${DIR_APP} ; \ fi \ - && mv ./tool/run_config-${MIRROR_PROFILE}.sh ${DIR_APP}/run_config.sh && ls -al ${DIR_APP} ./* + && ls -al ${DIR_APP} ./* # Stage 2: copy the built code and install packages FROM ${BASE_NAMESPACE:+$BASE_NAMESPACE/}${BASE_IMG} -LABEL maintainer=haobibo@gmail.com +LABEL maintainer=haobibo@labnow.ai ARG DIR_APP ARG PORT_SVC @@ -36,7 +39,8 @@ COPY --from=builder ${DIR_APP} ${DIR_APP}/ ENV PORT_SVC=${PORT_SVC:-80} \ ENTRYPOINT="app_common.debug" -RUN set -ex && pwd && ls -al && sh ./run_config.sh \ +RUN set -eux && pwd && ls -alh \ + && source /opt/utils/script-localize.sh ${PROFILE_LOCALIZE} \ && pip install -U --no-cache-dir pip \ && ( [ -f ./requirements.txt ] && pip install -U --no-cache-dir -r ./requirements.txt || true ) && pip list \ && rm -rf ~/.cache && ls -al && printenv | sort diff --git a/tool/cicd/docker-compose.app-demo.yml b/tool/cicd/docker-compose.app-demo.DEV.yml similarity index 81% rename from tool/cicd/docker-compose.app-demo.yml rename to tool/cicd/docker-compose.app-demo.DEV.yml index e346aca..6a87c37 100644 --- a/tool/cicd/docker-compose.app-demo.yml +++ b/tool/cicd/docker-compose.app-demo.DEV.yml @@ -6,16 +6,16 @@ services: hostname: ${CONTAINER_NAME:-dev-app-demo} build: context: ../../ - dockerfile: src/dev-app-demo.Dockerfile + dockerfile: tool/dev-demo.Dockerfile args: - ENABLE_CODE_BUILD=false - - PROFILE_LOCALIZE=${PROFILE_LOCALIZE:-aliyun-pub} + - PROFILE_LOCALIZE=${PROFILE_LOCALIZE:-default} pull: true restart: unless-stopped environment: - PYTHONUNBUFFERED=1 - PYTHONDONTWRITEBYTECODE=1 - - PROFILE_LOCALIZE=${PROFILE_LOCALIZE:-aliyun-pub} + - PROFILE_LOCALIZE=${PROFILE_LOCALIZE:-default} - ENV_PROFILE=${ENV_PROFILE:-DEV} # env_file: ["../credentials/DEV-app-demo.env"] user: "0:0" diff --git a/tool/cicd/run-dev.sh b/tool/cicd/run-dev.sh old mode 100644 new mode 100755 diff --git a/tool/dev-demo.Dockerfile b/tool/dev-demo.Dockerfile new file mode 100644 index 0000000..7730e46 --- /dev/null +++ b/tool/dev-demo.Dockerfile @@ -0,0 +1,24 @@ +ARG BASE_NAMESPACE="quay.io" +ARG BASE_IMG="labnow/node:latest" + +# this var PROFILE_LOCALIZE will be used in /opt/utils/script-localize.sh +ARG PROFILE_LOCALIZE="aliyun-pub" + +FROM ${BASE_NAMESPACE:+$BASE_NAMESPACE/}${BASE_IMG} AS dev + +ARG PROFILE_LOCALIZE + +# COPY src/requirements.txt /tmp/ + +USER root +RUN set -eux && pwd && ls -alh \ + && source /opt/utils/script-localize.sh ${PROFILE_LOCALIZE} \ + # ----------- handle frontend matters ----------- + && npm install -g pnpm \ + # ----------- handle backend matters ------------ + && pip install -U --no-cache-dir pip jupyterlab \ + # && pip install -U --no-cache-dir -r /tmp/requirements.txt \ + # ----------- install db client to connect db via terminal ------------ + && source /opt/utils/script-setup-db-clients.sh && setup_postgresql_client 17 \ + # ----------- clean up ----------- + && source /opt/utils/script-setup.sh && list_installed_packages && install__clean From 58e8db12167ea6d0a7c25a82a377cb3245e7c78f Mon Sep 17 00:00:00 2001 From: Bibo Hao Date: Wed, 24 Dec 2025 15:06:37 +0000 Subject: [PATCH 3/5] update cicd --- demo/app_common/ainlp/__init__.py | 0 demo/app_common/ainlp/model_bert.py | 86 -------- demo/app_common/ainlp/test-gpu-async.py | 0 demo/main.py | 19 ++ demo/requirements.txt | 1 + src/aloha/logger/__init__.py | 2 + src/aloha/service/streamer/__init__.py | 4 - src/aloha/service/streamer/base.py | 209 -------------------- src/aloha/service/streamer/managed_model.py | 22 --- src/aloha/service/streamer/multiprocess.py | 89 --------- src/aloha/service/streamer/redis.py | 164 --------------- src/aloha/service/streamer/threaded.py | 37 ---- src/setup.py | 2 +- tool/cicd/docker-compose.app-demo.DEV.yml | 6 +- tool/cicd/docker-compose.db.yml | 28 +++ tool/cicd/run-dev.sh | 1 - 16 files changed, 55 insertions(+), 615 deletions(-) delete mode 100644 demo/app_common/ainlp/__init__.py delete mode 100644 demo/app_common/ainlp/model_bert.py delete mode 100644 demo/app_common/ainlp/test-gpu-async.py create mode 100644 demo/main.py create mode 100644 demo/requirements.txt delete mode 100644 src/aloha/service/streamer/__init__.py delete mode 100644 src/aloha/service/streamer/base.py delete mode 100644 src/aloha/service/streamer/managed_model.py delete mode 100644 src/aloha/service/streamer/multiprocess.py delete mode 100644 src/aloha/service/streamer/redis.py delete mode 100644 src/aloha/service/streamer/threaded.py create mode 100644 tool/cicd/docker-compose.db.yml diff --git a/demo/app_common/ainlp/__init__.py b/demo/app_common/ainlp/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/demo/app_common/ainlp/model_bert.py b/demo/app_common/ainlp/model_bert.py deleted file mode 100644 index 477a895..0000000 --- a/demo/app_common/ainlp/model_bert.py +++ /dev/null @@ -1,86 +0,0 @@ -from typing import List - -import torch -from transformers import AutoTokenizer, AutoModel - -from aloha.service.streamer import ManagedModel - -SEED = 0 -torch.manual_seed(SEED) -torch.cuda.manual_seed(SEED) - - -class TextUnmaskModel: - def __init__(self, max_sent_len=16, model_path="bert-base-uncased"): - self.model_path = model_path - self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) - self.transformer = AutoModel.from_pretrained(self.model_path) - self.transformer.eval() - self.transformer.to(device="cuda") - self.max_sent_len = max_sent_len - - def predict(self, batch: List[str]) -> List[str]: - """predict masked word""" - batch_inputs = [] - masked_indexes = [] - - for text in batch: - tokenized_text = self.tokenizer.tokenize(text) - if len(tokenized_text) > self.max_sent_len - 2: - tokenized_text = tokenized_text[: self.max_sent_len - 2] - - tokenized_text = ['[CLS]'] + tokenized_text + ['[SEP]'] - tokenized_text += ['[PAD]'] * (self.max_sent_len - len(tokenized_text)) - - indexed_tokens = self.tokenizer.convert_tokens_to_ids(tokenized_text) - batch_inputs.append(indexed_tokens) - masked_indexes.append(tokenized_text.index('[MASK]')) - - tokens_tensor = torch.tensor(batch_inputs).to("cuda") - - with torch.no_grad(): - # prediction_scores: ``torch.FloatTensor`` of shape ``(batch_size, sequence_length, config.vocab_size)`` - prediction_scores = self.transformer(tokens_tensor)[0] - - batch_outputs = [] - for i in range(len(batch_inputs)): - predicted_index = torch.argmax(prediction_scores[i, masked_indexes[i]]).item() - predicted_token = self.tokenizer.convert_ids_to_tokens(predicted_index) - batch_outputs.append(predicted_token) - - return batch_outputs - - -class ManagedBertModel(ManagedModel): - def init_model(self): - self.model = TextUnmaskModel() - - def predict(self, batch): - return self.model.predict(batch) - - -def test_simple(): - tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") - model = AutoModel.from_pretrained("bert-base-uncased") - inputs = tokenizer("Hello! My name is [MASK]!", return_tensors="pt") - outputs = model(**inputs) - print(outputs) - - predicted_index = torch.argmax(outputs[1]).item() - predicted_token = tokenizer.convert_ids_to_tokens(predicted_index) - print(predicted_token) - - -def test_batch(): - batch_text = [ - "twinkle twinkle [MASK] star.", - "Happy birthday to [MASK].", - 'the answer to life, the [MASK], and everything.' - ] - model = TextUnmaskModel() - outputs = model.predict(batch_text) - print(outputs) - - -if __name__ == "__main__": - test_simple() diff --git a/demo/app_common/ainlp/test-gpu-async.py b/demo/app_common/ainlp/test-gpu-async.py deleted file mode 100644 index e69de29..0000000 diff --git a/demo/main.py b/demo/main.py new file mode 100644 index 0000000..7a44fb9 --- /dev/null +++ b/demo/main.py @@ -0,0 +1,19 @@ +import importlib +import sys + +usage = """ +Usage: python3 main.py module.name + the `module.name` should be a python package file or package which include a `main()` function +""" + +if len(sys.argv) < 2: + print(usage) + exit(-1) + +m = importlib.import_module(sys.argv[1]) +f_main = getattr(m, "main") + +if f_main is None: + print("Given module does not provides a `main()` function!") +else: + f_main() diff --git a/demo/requirements.txt b/demo/requirements.txt new file mode 100644 index 0000000..43c6e50 --- /dev/null +++ b/demo/requirements.txt @@ -0,0 +1 @@ +aloha[all] diff --git a/src/aloha/logger/__init__.py b/src/aloha/logger/__init__.py index 958a90c..e21cda3 100644 --- a/src/aloha/logger/__init__.py +++ b/src/aloha/logger/__init__.py @@ -1,3 +1,5 @@ +__all__ = ("LOG", "get_logger") + from .logger import get_logger from ..settings import SETTINGS diff --git a/src/aloha/service/streamer/__init__.py b/src/aloha/service/streamer/__init__.py deleted file mode 100644 index a3f1b44..0000000 --- a/src/aloha/service/streamer/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .managed_model import ManagedModel -from .multiprocess import MultiProcessStreamer -from .redis import RedisStreamer, RedisWorker, run_redis_workers_forever -from .threaded import ThreadedStreamer diff --git a/src/aloha/service/streamer/base.py b/src/aloha/service/streamer/base.py deleted file mode 100644 index 6bfa38a..0000000 --- a/src/aloha/service/streamer/base.py +++ /dev/null @@ -1,209 +0,0 @@ -import abc -import logging -import os -import queue -import threading -import time -import uuid -import weakref - -TIME_SLEEP = 0.001 -TIMEOUT = 1.0 -TIMEOUT_WORKER = 20 - -logger = logging.getLogger() - - -# logger.setLevel("INFO") - - -class FutureCache(dict): - # Dict for weakref only - pass - - -class Future: - def __init__(self, task_id, task_size: int, future_cache_ref): - self._id = task_id - self._size = task_size - self._future_cache_ref = future_cache_ref - self._outputs = [] - self._finish_event = threading.Event() - - def done(self): - return self._finish_event.is_set() - - def result(self, timeout=None): - if self._size == 0: - self._finish_event.set() - return [] - finished = self._finish_event.wait(timeout) - - if not finished: - raise TimeoutError("Task: %d Timeout" % self._id) - - # remove from future_cache - future_cache = self._future_cache_ref() - if future_cache is not None: - del future_cache[self._id] - - # [(request_id, output), ...] sorted by request_id - self._outputs.sort(key=lambda i: i[0]) - # restore batch result from outputs - batch_result = [i[1] for i in self._outputs] - - return batch_result - - def append_result(self, it_id, it_output): - self._outputs.append((it_id, it_output)) - if len(self._outputs) >= self._size: - self._finish_event.set() - - -class BaseWorker(abc.ABC): - __slots__ = ('_request_queue', '_response_queue') - - def __init__(self, func_predict: callable, batch_size: int, max_latency: float, destroy_event=None, *args, **kwargs): - # assert callable(predict_function) - self.pid = os.getpid() - self.func_predict = func_predict - self.batch_size = batch_size - self.max_latency = max_latency - self.destroy_event = destroy_event # kwargs.get("destroy_event", None) - - def _recv_request(self, timeout: float = TIMEOUT): - try: - item = self._request_queue.get(timeout=timeout) - except queue.Empty: - raise TimeoutError - else: - return item - - def _send_response(self, client_id, task_id, request_id, model_output): - self._response_queue.put((task_id, request_id, model_output)) - - def run_forever(self, *args, **kwargs): - self.pid = os.getpid() # overwrite the pid - logger.info("[gpu worker %d] %s start working" % (self.pid, self)) - - while True: - handled = self._run_once() - if self.destroy_event and self.destroy_event.is_set(): - break - if not handled: - # sleep if no data handled last time - time.sleep(TIME_SLEEP) - logger.info("[gpu worker %d] %s shutdown" % (self.pid, self)) - - def model_predict(self, batch_input): - batch_result = self.func_predict(batch_input) - assert len(batch_input) == len(batch_result), "input batch size {} and output batch size {} must be equal.".format(len(batch_input), len(batch_result)) - return batch_result - - def _run_once(self): - batch = [] - start_time = time.time() - for i in range(self.batch_size): - try: - item = self._recv_request(timeout=self.max_latency) - except TimeoutError: - # each item timeout exceed the max latency - break - else: - batch.append(item) - if (time.time() - start_time) > self.max_latency: - # total batch time exceeds the max latency - break - if not batch: - return 0 - - model_inputs = [i[3] for i in batch] - model_outputs = self.model_predict(model_inputs) - - # send results to response - for i, item in enumerate(batch): - client_id, task_id, request_id, _ = item - self._send_response(client_id, task_id, request_id, model_outputs[i]) - - batch_size = len(batch) - logger.info("[gpu worker %d] run_once batch_size: %d start_at: %s spend: %s" % ( - self.pid, batch_size, start_time, time.time() - start_time)) - return batch_size - - -class BaseStreamer(abc.ABC): - __slots__ = ('_input_queue', '_output_queue') - - def __init__(self, worker_timeout: float = TIMEOUT_WORKER, *args, **kwargs): - self._client_id = str(uuid.uuid4()) - self._task_id = 0 - self._future_cache = FutureCache() # {task_id: future} - self._worker_timeout = worker_timeout # kwargs.get("worker_timeout", TIMEOUT_WORKER) - - self.back_thread = threading.Thread(target=self._loop_collect_result, name="thread_collect_result") - self.back_thread.daemon = True - self.lock = threading.Lock() - - def _send_request(self, task_id, request_id, model_input): - self._input_queue.put((0, task_id, request_id, model_input)) - - def _recv_response(self, timeout=TIMEOUT): - try: - message = self._output_queue.get(timeout=timeout) - except queue.Empty: - message = None - return message - - @abc.abstractmethod - def destroy_workers(self): - raise NotImplementedError - - def _delay_setup(self): - self.back_thread.start() - - def _input(self, batch: list) -> int: - """input a batch, distribute each item to mq, return task_id""" - # task id in one client - self.lock.acquire() - task_id = self._task_id - self._task_id += 1 - self.lock.release() - # request id in one task - request_id = 0 - - future = Future(task_id, len(batch), weakref.ref(self._future_cache)) - self._future_cache[task_id] = future - - for model_input in batch: - self._send_request(task_id, request_id, model_input) - request_id += 1 - - return task_id - - def _loop_collect_result(self): - logger.info("start _loop_collect_result") - while True: - message = self._recv_response(timeout=TIMEOUT) - if message: - (task_id, request_id, item) = message - future = self._future_cache[task_id] - future.append_result(request_id, item) - else: - # todo - time.sleep(TIME_SLEEP) - - def _output(self, task_id: int) -> list: - future = self._future_cache[task_id] - batch_result = future.result(self._worker_timeout) - return batch_result - - def submit(self, batch): - task_id = self._input(batch) - future = self._future_cache[task_id] - return future - - def predict(self, batch): - task_id = self._input(batch) - ret = self._output(task_id) - assert len(batch) == len(ret), "input batch size {} and output batch size {} must be equal.".format(len(batch), len(ret)) - return ret diff --git a/src/aloha/service/streamer/managed_model.py b/src/aloha/service/streamer/managed_model.py deleted file mode 100644 index 1f5b386..0000000 --- a/src/aloha/service/streamer/managed_model.py +++ /dev/null @@ -1,22 +0,0 @@ -import abc -import os - - -class ManagedModel(abc.ABC): - def __init__(self, gpu_id=None, *args, **kwargs): - self.model = None - self.gpu_id = gpu_id - self.set_gpu_id(self.gpu_id) - - @staticmethod - def set_gpu_id(gpu_id=None): - if gpu_id is not None: - os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) - - @abc.abstractmethod - def init_model(self, *args, **kwargs): - raise NotImplementedError - - @abc.abstractmethod - def predict(self, batch: list) -> list: - raise NotImplementedError diff --git a/src/aloha/service/streamer/multiprocess.py b/src/aloha/service/streamer/multiprocess.py deleted file mode 100644 index 9e4d848..0000000 --- a/src/aloha/service/streamer/multiprocess.py +++ /dev/null @@ -1,89 +0,0 @@ -import multiprocessing -import os - -from .base import BaseStreamer, BaseWorker, TIMEOUT_WORKER, logger -from .managed_model import ManagedModel - - -class MultiProcessWorker(BaseWorker): - def __init__(self, func_predict: callable, batch_size: int, max_latency: float, - request_queue, response_queue, model_init_args, model_init_kwargs: dict, *args, **kwargs): - super().__init__(func_predict, batch_size, max_latency, *args, **kwargs) - self._request_queue = request_queue - self._response_queue = response_queue - self._model_init_args = model_init_args or [] - self._model_init_kwargs = model_init_kwargs or {} - - def run_forever(self, gpu_id=None, ready_event=None, destroy_event=None): - # if it is a managed model, lazy init model after forked & set CUDA_VISIBLE_DEVICES - if isinstance(self.func_predict, type) and issubclass(self.func_predict, ManagedModel): - model_class = self.func_predict - logger.info("[gpu worker %d] init model on gpu:%s" % (os.getpid(), gpu_id)) - self._model = model_class(gpu_id) - self._model.init_model(*self._model_init_args, **self._model_init_kwargs) - logger.info("[gpu worker %d] init model on gpu:%s" % (os.getpid(), gpu_id)) - self._predict = self._model.predict - if ready_event: - ready_event.set() # tell father process that init is finished - if destroy_event is not None: - self.destroy_event = destroy_event - super().run_forever() - - -class MultiProcessStreamer(BaseStreamer): - def __init__(self, func_predict: callable, batch_size: int, max_latency: float = 0.1, worker_num: int = 1, - cuda_devices=None, model_init_args=None, model_init_kwargs: dict = None, wait_for_worker_ready: bool = False, - mp_start_method='spawn', worker_timeout: float = TIMEOUT_WORKER): - super().__init__(worker_timeout=worker_timeout) - - self.multiprocess = multiprocessing.get_context(mp_start_method) - self._input_queue = self.multiprocess.Queue() - self._output_queue = self.multiprocess.Queue() - - self.worker_num = worker_num - self.cuda_devices = cuda_devices - - self._worker = MultiProcessWorker( - func_predict, batch_size, max_latency, self._input_queue, self._output_queue, - model_init_args, model_init_kwargs - ) - self._worker_processes = [] - self._worker_ready_events = [] - self._worker_destroy_events = [] - self._setup_gpu_worker() - if wait_for_worker_ready: - self._wait_for_worker_ready() - self._delay_setup() - - def _setup_gpu_worker(self): - for i in range(self.worker_num): - ready_event = self.multiprocess.Event() - destroy_event = self.multiprocess.Event() - if self.cuda_devices is not None: - gpu_id = self.cuda_devices[i % len(self.cuda_devices)] - args = (gpu_id, ready_event, destroy_event) - else: - args = (None, ready_event, destroy_event) - p = self.multiprocess.Process(target=self._worker.run_forever, args=args, name="stream_worker", daemon=True) - p.start() - self._worker_processes.append(p) - self._worker_ready_events.append(ready_event) - self._worker_destroy_events.append(destroy_event) - - def _wait_for_worker_ready(self, timeout=None): - if timeout is None: - timeout = self._worker_timeout - # wait for all workers finishing init - for i, e in enumerate(self._worker_ready_events): - # todo: select all events with timeout - is_ready = e.wait(timeout) - logger.info("gpu worker:%d ready state: %s" % (i, is_ready)) - - def destroy_workers(self): - for e in self._worker_destroy_events: - e.set() - for p in self._worker_processes: - p.join(timeout=self._worker_timeout) - if p.is_alive(): - raise TimeoutError("worker_process destroy timeout") - logger.info("workers destroyed") diff --git a/src/aloha/service/streamer/redis.py b/src/aloha/service/streamer/redis.py deleted file mode 100644 index 3f32632..0000000 --- a/src/aloha/service/streamer/redis.py +++ /dev/null @@ -1,164 +0,0 @@ -import multiprocessing -import os -import pickle -import queue -import threading -import time - -from .base import BaseStreamer, BaseWorker, TIMEOUT, TIME_SLEEP, logger -from ...logger import LOG - -try: - from redis import Redis -except ImportError: - LOG.warn('redis not installed, service.streamer.RedisStreamer will no be available!') - - -class RedisWorker(BaseWorker): - def __init__(self, func_predict, batch_size: int, max_latency=0.1, - redis_broker="localhost:6379", prefix='', - model_init_args=None, model_init_kwargs=None, *args, **kwargs): - # assert issubclass(model_class, ManagedModel) - super().__init__(func_predict, batch_size, max_latency, *args, **kwargs) - self._request_queue = queue.Queue() - # redis worker does not need a response queue - - self.prefix = prefix - self._model_init_args = model_init_args or [] - self._model_init_kwargs = model_init_kwargs or {} - self._redis_broker = redis_broker - self._redis = _RedisServer(0, self._redis_broker, self.prefix) - - self.back_thread = threading.Thread(target=self._loop_recv_request, name="thread_recv_request") - self.back_thread.daemon = True - self.back_thread.start() - - def _send_response(self, client_id, task_id, request_id, model_output): - # override the parent method - self._redis.send_response(client_id, task_id, request_id, model_output) - - def run_forever(self, gpu_id=None): - logger.info("[gpu worker %d] init model on gpu:%s" % (os.getpid(), gpu_id)) - model_class = self.func_predict - self._model = model_class(gpu_id) - self._model.init_model(*self._model_init_args, **self._model_init_kwargs) - self._predict = self._model.predict - - super().run_forever() - - def _loop_recv_request(self): - logger.info("[gpu worker %d] start loop_recv_request" % (os.getpid())) - while True: - message = self._redis.recv_request(timeout=TIMEOUT) - if message: - (client_id, task_id, request_id, request_item) = pickle.loads(message) - self._request_queue.put((client_id, task_id, request_id, request_item)) - else: - # sleep if recv timeout - time.sleep(TIME_SLEEP) - - -class RedisStreamer(BaseStreamer): - """ - 1. input batch as a task - 2. distribute every single item in batch to redis - 3. backend loop collecting results - 3. output batch result for a task when every single item is returned - """ - - def __init__(self, redis_broker="localhost:6379", prefix=''): - super().__init__() - - # redis streamer does not need input_queue/output_queue - - self.prefix = prefix - self._redis_broker = redis_broker - self._redis = _RedisClient(self._client_id, self._redis_broker, self.prefix) - self._delay_setup() - - def _send_request(self, task_id, request_id, model_input): - self._redis.send_request(task_id, request_id, model_input) - - def _recv_response(self, timeout=TIMEOUT): - return self._redis.recv_response(timeout) - - def destroy_workers(self): - pass - - -def _setup_redis_worker_and_runforever( - model_class, batch_size, max_latency, gpu_id, redis_broker, prefix='', model_init_args=None, model_init_kwargs=None -): - redis_worker = RedisWorker( - model_class, batch_size, max_latency, redis_broker=redis_broker, prefix=prefix, - model_init_args=model_init_args, model_init_kwargs=model_init_kwargs - ) - redis_worker.run_forever(gpu_id) - - -def run_redis_workers_forever( - model_class, batch_size, max_latency=0.1, - worker_num=1, cuda_devices=None, redis_broker="localhost:6379", - prefix='', mp_start_method='spawn', model_init_args=None, model_init_kwargs=None -): - procs = [] - mp = multiprocessing.get_context(mp_start_method) - for i in range(worker_num): - if cuda_devices is not None: - gpu_id = cuda_devices[i % len(cuda_devices)] - else: - gpu_id = None - args = (model_class, batch_size, max_latency, gpu_id, redis_broker, prefix, model_init_args, model_init_kwargs) - p = mp.Process(target=_setup_redis_worker_and_runforever, args=args, name="stream_worker", daemon=True) - p.start() - procs.append(p) - - for p in procs: - p.join() - - -class _RedisAgent: - def __init__(self, redis_id, redis_broker='localhost:6379', prefix=''): - self._redis_id = redis_id - self._redis_host = redis_broker.split(":")[0] - self._redis_port = int(redis_broker.split(":")[1]) - self._redis_request_queue_name = "request_queue" + prefix - self._redis_response_pb_prefix = "response_pb_" + prefix - self._redis = Redis(host=self._redis_host, port=self._redis_port) - self._response_pb = self._redis.pubsub(ignore_subscribe_messages=True) - self._setup() - - def _setup(self): - raise NotImplementedError - - def _response_pb_name(self, redis_id): - return self._redis_response_pb_prefix + redis_id - - -class _RedisClient(_RedisAgent): - def _setup(self): - self._response_pb.subscribe(self._response_pb_name(self._redis_id)) - - def send_request(self, task_id, request_id, model_input): - message = (self._redis_id, task_id, request_id, model_input) - self._redis.lpush(self._redis_request_queue_name, pickle.dumps(message)) - - def recv_response(self, timeout): - message = self._response_pb.get_message(timeout=timeout) - if message: - return pickle.loads(message["data"]) - - -class _RedisServer(_RedisAgent): - def _setup(self): - self._response_pb.psubscribe(self._redis_response_pb_prefix + "*") # server subscribe all pubsub - - def recv_request(self, timeout): - message = self._redis.blpop(self._redis_request_queue_name, timeout=timeout) - if message: # (queue_name, data) - return message[1] - - def send_response(self, client_id, task_id, request_id, model_output): - message = (task_id, request_id, model_output) - channel_name = self._response_pb_name(client_id) - self._redis.publish(channel_name, pickle.dumps(message)) diff --git a/src/aloha/service/streamer/threaded.py b/src/aloha/service/streamer/threaded.py deleted file mode 100644 index e63f74d..0000000 --- a/src/aloha/service/streamer/threaded.py +++ /dev/null @@ -1,37 +0,0 @@ -import queue -import threading - -from .base import BaseStreamer, BaseWorker, TIMEOUT_WORKER, logger - - -class ThreadedWorker(BaseWorker): - def __init__(self, func_predict: callable, batch_size: int, max_latency: float, - request_queue, response_queue, *args, **kwargs): - super().__init__(func_predict, batch_size, max_latency, *args, **kwargs) - self._request_queue = request_queue - self._response_queue = response_queue - - -class ThreadedStreamer(BaseStreamer): - def __init__(self, predict_function: callable, batch_size: int, max_latency: float = 0.1, worker_timeout: float = TIMEOUT_WORKER): - super().__init__(worker_timeout=worker_timeout) - - self._input_queue = queue.Queue() - self._output_queue = queue.Queue() - - self._worker_destroy_event = threading.Event() - self._worker = ThreadedWorker( - predict_function, batch_size, max_latency, self._input_queue, self._output_queue, - destroy_event=self._worker_destroy_event - ) - self._worker_thread = threading.Thread(target=self._worker.run_forever, name="thread_worker") - self._worker_thread.daemon = True - self._worker_thread.start() - self._delay_setup() - - def destroy_workers(self): - self._worker_destroy_event.set() - self._worker_thread.join(timeout=self._worker_timeout) - if self._worker_thread.is_alive(): - raise TimeoutError("worker_thread destroy timeout") - logger.info("workers destroyed") diff --git a/src/setup.py b/src/setup.py index 2af3cd7..b0df72c 100644 --- a/src/setup.py +++ b/src/setup.py @@ -13,7 +13,7 @@ dict_extra_requires = { 'build': ['Cython'], - 'service': ['requests', 'tornado', 'psutil', 'pyjwt'], + 'service': ['requests', 'tornado', 'psutil', 'pyjwt', 'fastapi', 'httpx'], 'db': ['sqlalchemy', 'psycopg[binary]', 'pymysql', 'elasticsearch', 'pymongo', 'redis'], 'stream': ['confluent_kafka'], 'data': ['pandas'], diff --git a/tool/cicd/docker-compose.app-demo.DEV.yml b/tool/cicd/docker-compose.app-demo.DEV.yml index 6a87c37..ca06e80 100644 --- a/tool/cicd/docker-compose.app-demo.DEV.yml +++ b/tool/cicd/docker-compose.app-demo.DEV.yml @@ -1,5 +1,6 @@ name: ${PROJECT_NAME:-DEV-app-demo} + services: dev-app-demo: container_name: ${CONTAINER_NAME:-dev-app-demo} @@ -19,10 +20,11 @@ services: - ENV_PROFILE=${ENV_PROFILE:-DEV} # env_file: ["../credentials/DEV-app-demo.env"] user: "0:0" - ports: ["${PORT_APP:-40000}:9000", "${PORT_WEB:-43000}:3000"] + ports: ["${PORT_APP:-9000}:9000", "${PORT_WEB:-3000}:3000"] volumes: - ../../doc:/root/doc:rw - ../../notebook:/root/notebook:rw - ../../src:/root/src:rw - working_dir: /root/src + - ../../demo:/root/demo:rw + working_dir: /root command: "tail -f /dev/null" diff --git a/tool/cicd/docker-compose.db.yml b/tool/cicd/docker-compose.db.yml new file mode 100644 index 0000000..bd0e8ff --- /dev/null +++ b/tool/cicd/docker-compose.db.yml @@ -0,0 +1,28 @@ +name: com-common-db-postgres + +networks: + net-db-common: + name: net-db-common + external: false + + +services: + db-postgres-common: + container_name: db-postgres-common + hostname: db-postgres-common + image: quay.io/labnow/postgres-16-ext + pull_policy: if_not_present + restart: unless-stopped + environment: + POSTGRES_DB: sys + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + PG_CRON_DB: postgres + PG_PRELOAD_LIBS: pg_duckdb,pg_search,pg_cron + volumes: + - ${DIR_PG_DATA:-/data/data-pg-common}:/var/lib/postgresql/data:rw + - ../../src/resource/db:/data/mapp-ihealth/sql:rw + # - ./sql:/docker-entrypoint-initdb.d:rw + networks: ["net-db-common"] + ports: ["5432:5432"] + # su postgres && psql -d postgres -U postgres diff --git a/tool/cicd/run-dev.sh b/tool/cicd/run-dev.sh index 3d9020f..d0909f0 100755 --- a/tool/cicd/run-dev.sh +++ b/tool/cicd/run-dev.sh @@ -52,7 +52,6 @@ echo "Action: $ACTION" echo "Compose: $COMPOSE" echo "----------------------------------------" -docker network create net-ailab-proxy || docker network create net-db-common || true ; case "$ACTION" in up) From 5fcc3662f43889e6c2a13387593a034c384eff01 Mon Sep 17 00:00:00 2001 From: Bibo Hao Date: Wed, 24 Dec 2025 16:34:56 +0000 Subject: [PATCH 4/5] update db op --- .github/workflows/pip.yml | 6 ++---- src/aloha/db/oracle.py | 20 ++++++++------------ src/aloha/db/postgres.py | 2 +- tool/cicd/docker-compose.db.yml | 1 + 4 files changed, 12 insertions(+), 17 deletions(-) diff --git a/.github/workflows/pip.yml b/.github/workflows/pip.yml index f83bcf7..b240870 100644 --- a/.github/workflows/pip.yml +++ b/.github/workflows/pip.yml @@ -13,14 +13,12 @@ on: # Allows you to run this workflow manually from the Actions tab workflow_dispatch: -# A workflow run is made up of one or more jobs that can run sequentially or in parallel + jobs: - # This workflow contains a single job called "build" - build: + build-pypi-package: # The type of runner that the job will run on runs-on: ubuntu-latest - # Steps represent a sequence of tasks that will be executed as part of the job steps: # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it # sudo python setup.py install clean --all diff --git a/src/aloha/db/oracle.py b/src/aloha/db/oracle.py index 9c99b29..44b55bd 100644 --- a/src/aloha/db/oracle.py +++ b/src/aloha/db/oracle.py @@ -8,7 +8,7 @@ from .base import PasswordVault from ..logger import LOG -LOG.debug('Version of oracledb = %s' % oracledb.__version__) +LOG.debug('oracledb version = %s' % oracledb.__version__) class OracledbOperator: @@ -27,23 +27,22 @@ def __init__(self, db_config, **kwargs): } """ + password_vault = PasswordVault.get_vault(db_config.get('vault_type'), db_config.get('vault_config')) self._config = { 'host': db_config['host'], 'port': db_config['port'], 'user': db_config['user'], - 'password': db_config['password'], + 'password': password_vault.get_password(db_config.get('password')), } if 'lib_dir' in db_config: # use Thick mode try: oracledb.init_oracle_client(lib_dir=db_config['lib_dir']) - print("Oracle client initialized in THICK mode from: %s" % db_config['lib_dir']) + LOG.info("Oracle client initialized in THICK mode from: %s" % db_config['lib_dir']) except Exception as e: - print(f"Warning: {e}") + LOG.warning(f"Warning: {e}") raise RuntimeError(f"Failed to initialize Oracle client: {e}") - print("is_thin =", oracledb.is_thin_mode()) - service_name = db_config.get("service_name") sid = db_config.get("sid") @@ -59,22 +58,19 @@ def __init__(self, db_config, **kwargs): self.engine = create_engine( "oracle+oracledb://{user}:{password}@".format(**self._config), connect_args={"dsn": dsn}, - pool_size=20, - max_overflow=10, - pool_pre_ping=True, - **kwargs + pool_size=20, max_overflow=10, pool_pre_ping=True, **kwargs ) msg = "OracleDB connected: {host}:{port}".format(**self._config) print(msg) except Exception as e: - raise RuntimeError(f"Failed to connect to OracleDB: {e}") + LOG.error(e) + raise RuntimeError(f"Failed to connect to OracleDB") @property def connection(self): return self.engine def execute_query(self, sql, *args, **kwargs): - """returns SQLAlchemy CursorResult""" with self.engine.connect() as conn: cur = conn.execute(text(sql), *args, **kwargs) return cur diff --git a/src/aloha/db/postgres.py b/src/aloha/db/postgres.py index 76f030c..ba4f460 100644 --- a/src/aloha/db/postgres.py +++ b/src/aloha/db/postgres.py @@ -33,7 +33,7 @@ def __init__(self, db_config, **kwargs): LOG.debug("PostgresSQL connected: {host}:{port}/{dbname}".format(**self._config)) except Exception as e: LOG.error(e) - raise RuntimeError('Failed to connect to PostgresSQL') + raise RuntimeError("Failed to connect to PostgresSQL") @property def connection(self): diff --git a/tool/cicd/docker-compose.db.yml b/tool/cicd/docker-compose.db.yml index bd0e8ff..709b5a6 100644 --- a/tool/cicd/docker-compose.db.yml +++ b/tool/cicd/docker-compose.db.yml @@ -21,6 +21,7 @@ services: PG_PRELOAD_LIBS: pg_duckdb,pg_search,pg_cron volumes: - ${DIR_PG_DATA:-/data/data-pg-common}:/var/lib/postgresql/data:rw + # sudo chmod -R 777 /data/data-pg-common - ../../src/resource/db:/data/mapp-ihealth/sql:rw # - ./sql:/docker-entrypoint-initdb.d:rw networks: ["net-db-common"] From 6bc537ebe4a352b98bd7cfdf06ab151f9a5b93cb Mon Sep 17 00:00:00 2001 From: Bibo Hao Date: Wed, 24 Dec 2025 16:43:13 +0000 Subject: [PATCH 5/5] add vscode settings --- .vscode/extensions.json | 10 ++++++++++ .vscode/settings.json | 26 ++++++++++++++++++++++++++ src/aloha/db/oracle.py | 9 ++++----- 3 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 .vscode/extensions.json create mode 100644 .vscode/settings.json diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000..6175c1d --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,10 @@ +{ + "recommendations": [ + "charliermarsh.ruff", + "dbcode.dbcode", + "esbenp.prettier-vscode", + "ms-python.python", + "ms-python.vscode-pylance" + ], + "unwantedRecommendations": [] +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..fb1ea69 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,26 @@ +{ + "editor.formatOnSave": true, + "files.exclude": { + "**/.git": true, + "**/node_modules": true, + "**/__pycache__": true, + "**/*.pyc": true + }, + "[python]": { + "editor.defaultFormatter": "ms-python.autopep8", + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit", + "source.fixAll.ruff": "explicit" + } + }, + "[sql]": { + "editor.defaultFormatter": "dbcode.dbcode", // mtxr.sqltools, dbcode.dbcode, ReneSaarsoo.sql-formatter-vsc + "editor.formatOnSave": true + }, + "ruff.lineLength": 128, + "ruff.configuration": { + "format": { + "quote-style": "double" + } + } +} diff --git a/src/aloha/db/oracle.py b/src/aloha/db/oracle.py index 44b55bd..b929000 100644 --- a/src/aloha/db/oracle.py +++ b/src/aloha/db/oracle.py @@ -4,11 +4,10 @@ from sqlalchemy import create_engine from sqlalchemy.sql import text - from .base import PasswordVault from ..logger import LOG -LOG.debug('oracledb version = %s' % oracledb.__version__) +LOG.debug("oracledb version = %s" % oracledb.__version__) class OracledbOperator: @@ -38,7 +37,7 @@ def __init__(self, db_config, **kwargs): if 'lib_dir' in db_config: # use Thick mode try: oracledb.init_oracle_client(lib_dir=db_config['lib_dir']) - LOG.info("Oracle client initialized in THICK mode from: %s" % db_config['lib_dir']) + LOG.info("Oracle client initialized in THICK mode from: %s" % db_config["lib_dir"]) except Exception as e: LOG.warning(f"Warning: {e}") raise RuntimeError(f"Failed to initialize Oracle client: {e}") @@ -47,9 +46,9 @@ def __init__(self, db_config, **kwargs): sid = db_config.get("sid") if service_name: # using service_name (recommended) - dsn = oracledb.makedsn(db_config['host'], db_config['port'], service_name=service_name) + dsn = oracledb.makedsn(db_config["host"], db_config["port"], service_name=service_name) elif sid: # using SID - dsn = oracledb.makedsn(db_config['host'], db_config['port'], sid=sid) + dsn = oracledb.makedsn(db_config["host"], db_config["port"], sid=sid) else: raise ValueError("Oracle config must specify service_name or sid")