Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions setup_local_nfs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#!/bin/bash
# Local NFS Setup Script - Supports CentOS/RHEL/TencentOS/Ubuntu/Debian/Fedora
set -e

SHARE_PATH="/shared/rocksdb"
MOUNT_PATH="/mnt/rocksdb"
LOCAL_DB_PATH="./home/db"
CLEAN_MODE=false

# Detect OS and set variables
detect_os() {
[ -f /etc/os-release ] && . /etc/os-release || OS_ID="centos"
OS_ID="${ID:-$OS_ID}"

case "$OS_ID" in
ubuntu|debian)
PKG_MGR="apt-get"; NFS_PKG="nfs-kernel-server nfs-common"; NFS_SVC="nfs-kernel-server" ;;
centos|rhel|rocky|almalinux|ol|tencentos|tlinux|opencloudos|anolis)
PKG_MGR="yum"; NFS_PKG="nfs-utils"; NFS_SVC="nfs-server" ;;
fedora)
PKG_MGR="dnf"; NFS_PKG="nfs-utils"; NFS_SVC="nfs-server" ;;
*) echo "Unsupported OS: $OS_ID"; exit 1 ;;
esac
echo "OS: $OS_ID"
}

# Install NFS
install_nfs() {
echo "=== Installing NFS ==="
case "$PKG_MGR" in
apt-get) sudo apt-get update -qq && sudo DEBIAN_FRONTEND=noninteractive apt-get install -y $NFS_PKG ;;
*) sudo $PKG_MGR install -y $NFS_PKG ;;
esac
}

# Start NFS services
start_nfs() {
echo "=== Starting NFS ==="
sudo systemctl start rpcbind $NFS_SVC 2>/dev/null || true
sudo systemctl enable rpcbind $NFS_SVC 2>/dev/null || true
sleep 2
}

# Clean data
clean_data() {
echo "=== Cleaning Data ==="
echo "Share path: $SHARE_PATH"
echo "Local DB path: $LOCAL_DB_PATH"
rm -rf "$SHARE_PATH"/* 2>/dev/null || true
rm -rf "$LOCAL_DB_PATH"/* 2>/dev/null || true
echo "Done"
}

# Usage
usage() {
cat <<EOF
Usage: $0 [-s share_path] [-m mount_path] [-l local_db_path] [-c] [-h]
-s PATH Share directory (default: /shared/rocksdb)
-m PATH Mount point (default: /mnt/rocksdb)
-l PATH Local DB path for clean (default: ./home/db)
-c Clean mode only
-h Help
EOF
exit 0
}

# Parse args
while getopts "s:m:l:ch" opt; do
case $opt in
s) SHARE_PATH="$OPTARG" ;;
m) MOUNT_PATH="$OPTARG" ;;
l) LOCAL_DB_PATH="$OPTARG" ;;
c) CLEAN_MODE=true ;;
h) usage ;;
*) usage ;;
esac
done

# Clean mode
if [ "$CLEAN_MODE" = true ]; then
clean_data
exit 0
fi

echo "=== NFS Setup: $SHARE_PATH -> $MOUNT_PATH ==="
detect_os
install_nfs

# Create share dir
echo "=== Creating Directories ==="
sudo mkdir -p "$SHARE_PATH" "$MOUNT_PATH"
sudo chown -R "$(id -un):$(id -gn)" "$SHARE_PATH"
chmod 755 "$SHARE_PATH"

# Configure exports
echo "=== Configuring Exports ==="
EXPORT_LINE="$SHARE_PATH *(rw,sync,no_root_squash,no_subtree_check,insecure)"
grep -q "^$SHARE_PATH " /etc/exports 2>/dev/null && \
sudo sed -i "s|^$SHARE_PATH .*|$EXPORT_LINE|" /etc/exports || \
echo "$EXPORT_LINE" | sudo tee -a /etc/exports

start_nfs
sudo exportfs -arv
showmount -e localhost

# Mount
echo "=== Mounting ==="
mountpoint -q "$MOUNT_PATH" 2>/dev/null && sudo umount "$MOUNT_PATH"
sudo mount -t nfs localhost:"$SHARE_PATH" "$MOUNT_PATH"

# Verify
echo "=== Verifying ==="
echo 'test' > "$SHARE_PATH/test.txt" && cat "$MOUNT_PATH/test.txt" && rm "$MOUNT_PATH/test.txt"
echo "=== Complete: $SHARE_PATH mounted at $MOUNT_PATH ==="
25 changes: 23 additions & 2 deletions src/tendisplus/server/server_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ std::string removeQuotes(const std::string& v) {
}

auto tmp = v;
if (tmp[0] == '\"' && tmp[tmp.size() - 1] == '\"') {
if (tmp[0] == '"' && tmp[tmp.size() - 1] == '"') {
tmp = tmp.substr(1, tmp.size() - 2);
}
return tmp;
Expand All @@ -185,7 +185,7 @@ std::string removeQuotesAndToLower(const std::string& v) {
return tmp;
}

if (tmp[0] == '\"' && tmp[tmp.size() - 1] == '\"') {
if (tmp[0] == '"' && tmp[tmp.size() - 1] == '"') {
tmp = tmp.substr(1, tmp.size() - 2);
}
return tmp;
Expand Down Expand Up @@ -620,6 +620,27 @@ ServerParams::ServerParams() {
enableClosePubSubConnection);
REGISTER_VARS_DIFF_NAME_DYNAMIC("enable-move-pubsub-request",
enableMovePubSubRequest);

// Remote Compaction Service (CSA) Configuration
REGISTER_VARS_DIFF_NAME("csa_address", csaAddress);
// Note: remote_compaction.mode removed - only shared_storage mode is
// supported

// Shared filesystem configuration (unified for NFS/HDFS/S3/etc.)
REGISTER_VARS_DIFF_NAME("remote_compaction.shared_fs_uri",
remoteCompactionSharedFsUri);
REGISTER_VARS_DIFF_NAME("remote_compaction.shared_fs_local_prefix",
remoteCompactionSharedFsLocalPrefix);

// Remote Compaction Advanced Settings
REGISTER_VARS_DIFF_NAME("remote_compaction.max_concurrent_tasks",
remoteCompactionMaxConcurrentTasks);
REGISTER_VARS_DIFF_NAME("remote_compaction.grpc_max_message_size",
remoteCompactionGrpcMaxMessageSize);
REGISTER_VARS_DIFF_NAME("remote_compaction.check_time_interval",
remoteCompactionCheckTimeInterval);
REGISTER_VARS_DIFF_NAME("remote_compaction.max_reschedule",
remoteCompactionMaxReschedule);
}

ServerParams::~ServerParams() {
Expand Down
23 changes: 23 additions & 0 deletions src/tendisplus/server/server_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,29 @@ class ServerParams {

bool enableClosePubSubConnection = true;
bool enableMovePubSubRequest = true;

// Remote Compaction Service (CSA) Configuration
std::string csaAddress =
""; // CSA server address (host:port), e.g., "localhost:8010"
// Note: remote compaction only supports shared_storage mode (no configuration
// needed)

// Shared filesystem configuration (unified for NFS/HDFS/S3/etc.)
// URI format: "nfs://host/path", "hdfs://host:port/path",
// "s3://bucket/prefix"
std::string remoteCompactionSharedFsUri = ""; // Shared filesystem URI
std::string remoteCompactionSharedFsLocalPrefix =
""; // Optional local path prefix (for backward compatibility)

// Remote Compaction Advanced Settings
int64_t remoteCompactionMaxConcurrentTasks =
0; // Max concurrent compaction tasks (0 = use default)
int64_t remoteCompactionGrpcMaxMessageSize =
0; // Max gRPC message size in bytes (0 = use default 16MB)
int32_t remoteCompactionCheckTimeInterval =
0; // Check time interval in seconds (0 = use default)
uint64_t remoteCompactionMaxReschedule =
0; // Max reschedule times (0 = use default)
};

extern std::shared_ptr<tendisplus::ServerParams> gParams;
Expand Down
133 changes: 131 additions & 2 deletions src/tendisplus/storage/rocks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,134 @@
add_library(rocks_kvstore STATIC rocks_kvstore.cpp rocks_kvttlcompactfilter.cpp)
target_link_libraries(rocks_kvstore utils_common kvstore rocksdb record glog ${SYS_LIBS} snappy lz4_static)
include(cmake/grpc.cmake)
# Proto file
get_filename_component(csa_proto "remote_compaction/protos/csa.proto" ABSOLUTE)
get_filename_component(csa_proto_path "${csa_proto}" PATH)

# Generated sources
set(csa_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/csa.pb.cc")
set(csa_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/csa.pb.h")
set(csa_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/csa.grpc.pb.cc")
set(csa_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/csa.grpc.pb.h")
add_custom_command(
OUTPUT "${csa_proto_srcs}" "${csa_proto_hdrs}" "${csa_grpc_srcs}" "${csa_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${csa_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${csa_proto}"
DEPENDS "${csa_proto}")

# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")

# csa_grpc_proto
add_library(csa_grpc_proto
${csa_grpc_srcs}
${csa_grpc_hdrs}
${csa_proto_srcs}
${csa_proto_hdrs})
target_link_libraries(csa_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})

# compaction_service_helper
#add_library(compaction_service_helper
# "db/compaction/remote_compaction/job_distributor.h"
# "db/compaction/remote_compaction/job_distributor.cc")
#target_link_libraries(compaction_service_helper
# compaction_service_grpc_proto
# ${_REFLECTION}
# ${_GRPC_GRPCPP}
# ${_PROTOBUF_LIBPROTOBUF})

# Targets compaction_service_(client|server)
find_package(PkgConfig)
if(PkgConfig_FOUND)
pkg_check_modules(LIBNFS libnfs)
endif()

if(NOT LIBNFS_FOUND)
# 查找头文件
find_path(LIBNFS_INCLUDE_DIR
NAMES nfsc/libnfs.h
PATHS
/usr/include
/usr/local/include
/opt/libnfs/include
$ENV{LIBNFS_ROOT}/include
PATH_SUFFIXES nfs
)

# 查找库文件
find_library(LIBNFS_LIBRARY
NAMES nfs
PATHS
/usr/lib
/usr/local/lib
/usr/lib/x86_64-linux-gnu
/usr/lib64
/opt/libnfs/lib
$ENV{LIBNFS_ROOT}/lib
)

# 检查是否找到
if(LIBNFS_INCLUDE_DIR AND LIBNFS_LIBRARY)
set(LIBNFS_FOUND TRUE)
set(LIBNFS_INCLUDE_DIRS ${LIBNFS_INCLUDE_DIR})
set(LIBNFS_LIBRARIES ${LIBNFS_LIBRARY})
message(STATUS "Found libnfs: ${LIBNFS_LIBRARY}")
else()
message(FATAL_ERROR "libnfs not found! Please install libnfs-dev or set LIBNFS_ROOT environment variable")
endif()
endif()
add_library(nfs_fs STATIC nfs_filesystem.cc nfs_file.cc shared_filesystem.cc)
# 添加头文件路径
target_include_directories(nfs_fs
PUBLIC
${LIBNFS_INCLUDE_DIRS}
)

# 链接 libnfs
target_link_libraries(nfs_fs
PUBLIC
rocksdb
${LIBNFS_LIBRARIES}
)


foreach(_target csa_server)
add_executable(${_target}
"remote_compaction/${_target}.cc")
add_dependencies(${_target} csa_grpc_proto)
add_dependencies(${_target} nfs_fs)
target_link_libraries(${_target}
# compaction_service_helper
csa_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF}
rocksdb
lz4_static
nfs_fs
)
endforeach()

# message(STATUS "askjdakshakjsdgajdhajgjdsh")
# add_executable(remote_test remote_test.cpp remote_util.h remote_util.cc)#ifndef NDEBUG
# target_compile_definitions(remote_test PUBLIC -DNDEBUG)
# target_link_libraries(remote_test rocksdb lz4_static)

# add_library(rocks_kvstore STATIC rocks_kvstore.cpp rocks_kvttlcompactfilter.cpp)
# target_link_libraries(rocks_kvstore utils_common kvstore rocksdb record glog ${SYS_LIBS} snappy lz4_static)

add_library(rocks_kvstore STATIC rocks_kvstore.cpp rocks_kvttlcompactfilter.cpp compaction_service.cc)
target_link_libraries(rocks_kvstore utils_common kvstore rocksdb record glog ${SYS_LIBS} snappy lz4_static csa_grpc_proto
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF}
nfs_fs
)

add_library(rocks_kvstore_for_test STATIC rocks_kvstore.cpp rocks_kvttlcompactfilter.cpp)
target_compile_definitions(rocks_kvstore_for_test PRIVATE -DNO_VERSIONEP)
Expand Down
Loading