Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration/pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ healthcheck_port = 8080
tls_certificate = "integration/tls/cert.pem"
tls_private_key = "integration/tls/key.pem"
query_parser_engine = "pg_query_raw"
system_catalogs = "omnisharded_sticky"

[memory]
net_buffer = 8096
Expand Down
4 changes: 4 additions & 0 deletions integration/schema_sync/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
.claude
destination.sql
source.sql
*.bak
diff.txt
41 changes: 27 additions & 14 deletions integration/schema_sync/dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,31 @@ for f in source.sql destination.sql; do
sed -i.bak '/^\\restrict.*$/d' $f
sed -i.bak '/^\\unrestrict.*$/d' $f
done
rm -f source.sql.bak destination.sql.bak

# Verify integer primary keys are rewritten to bigint, and no other differences exist
DIFF_OUTPUT=$(diff source.sql destination.sql || true)
echo "$DIFF_OUTPUT" | grep -q 'flag_id integer NOT NULL' || { echo "Expected flag_id integer->bigint rewrite"; exit 1; }
echo "$DIFF_OUTPUT" | grep -q 'flag_id bigint NOT NULL' || { echo "Expected flag_id integer->bigint rewrite"; exit 1; }
echo "$DIFF_OUTPUT" | grep -q 'setting_id integer NOT NULL' || { echo "Expected setting_id integer->bigint rewrite"; exit 1; }
echo "$DIFF_OUTPUT" | grep -q 'setting_id bigint NOT NULL' || { echo "Expected setting_id integer->bigint rewrite"; exit 1; }
sed -i.bak 's/flag_id integer NOT NULL/flag_id bigint NOT NULL/g' source.sql
sed -i.bak 's/setting_id integer NOT NULL/setting_id bigint NOT NULL/g' source.sql
rm -f source.sql.bak
diff source.sql destination.sql
rm source.sql
rm destination.sql

# Expected content changes (without line numbers for portability)
EXPECTED_CHANGES=$(cat <<EOF
< flag_id integer NOT NULL,
> flag_id bigint NOT NULL,
< setting_id integer NOT NULL,
> setting_id bigint NOT NULL,
< override_id integer NOT NULL,
> override_id bigint NOT NULL,
< flag_id integer NOT NULL,
> flag_id bigint NOT NULL,
EOF)

diff source.sql destination.sql > diff.txt || true

# Extract just the content lines (< and >) for comparison
ACTUAL_CHANGES=$(grep '^[<>]' diff.txt)
if [ "$ACTUAL_CHANGES" != "$EXPECTED_CHANGES" ]; then
echo "Schema diff does not match expected changes"
echo "=== Expected ==="
echo "$EXPECTED_CHANGES"
echo "=== Actual ==="
echo "$ACTUAL_CHANGES"
exit 1
fi

rm source.sql destination.sql diff.txt
popd
9 changes: 9 additions & 0 deletions integration/schema_sync/ecommerce_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1005,3 +1005,12 @@ CREATE TABLE core.feature_flags (
flag_name VARCHAR(100) NOT NULL UNIQUE,
is_enabled BOOLEAN NOT NULL DEFAULT FALSE
);

CREATE TABLE core.user_feature_overrides (
override_id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES core.users(user_id) ON DELETE CASCADE,
flag_id INTEGER NOT NULL REFERENCES core.feature_flags(flag_id) ON DELETE CASCADE,
is_enabled BOOLEAN NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, flag_id)
);
1 change: 1 addition & 0 deletions pgdog-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ url = "2"
uuid = { version = "1", features = ["v4", "serde"] }
rand = "*"
pgdog-vector = { path = "../pgdog-vector" }
once_cell = "*"

[dev-dependencies]
tempfile = "3.23.0"
42 changes: 15 additions & 27 deletions pgdog-config/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use tracing::{info, warn};

use crate::sharding::ShardedSchema;
use crate::{
EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements,
QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role,
system_catalogs, EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth,
PreparedStatements, QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role,
SystemCatalogsBehavior,
};

use super::database::Database;
Expand Down Expand Up @@ -249,32 +250,19 @@ impl Config {

// Automatically configure system catalogs
// as omnisharded.
if self.general.system_catalogs_omnisharded {
if self.general.system_catalogs != SystemCatalogsBehavior::Sharded {
let sticky_routing = matches!(
self.general.system_catalogs,
SystemCatalogsBehavior::OmnishardedSticky
);
for database in databases {
let entry = tables.entry(database).or_insert_with(Vec::new);

for table in [
"pg_class",
"pg_attribute",
"pg_attrdef",
"pg_index",
"pg_constraint",
"pg_namespace",
"pg_database",
"pg_tablespace",
"pg_type",
"pg_proc",
"pg_operator",
"pg_cast",
"pg_enum",
"pg_range",
"pg_authid",
"pg_am",
] {
if entry.iter().find(|t| t.name == table).is_none() {
for table in system_catalogs() {
if entry.iter().find(|t| t.name == *table).is_none() {
entry.push(OmnishardedTable {
name: table.to_string(),
sticky_routing: true,
sticky_routing,
});
}
}
Expand Down Expand Up @@ -765,7 +753,7 @@ password = "users_admin_password"
[general]
host = "0.0.0.0"
port = 6432
system_catalogs_omnisharded = false
system_catalogs = "sharded"

[[databases]]
name = "db1"
Expand Down Expand Up @@ -821,7 +809,7 @@ tables = ["table_x"]
[general]
host = "0.0.0.0"
port = 6432
system_catalogs_omnisharded = true
system_catalogs = "omnisharded_sticky"

[[databases]]
name = "db1"
Expand All @@ -848,12 +836,12 @@ tables = ["my_table"]
let pg_class = db1_tables.iter().find(|t| t.name == "pg_class").unwrap();
assert!(pg_class.sticky_routing);

// Test with system_catalogs_omnisharded = false
// Test with system_catalogs = "sharded" (no omnisharding)
let source_disabled = r#"
[general]
host = "0.0.0.0"
port = 6432
system_catalogs_omnisharded = false
system_catalogs = "sharded"

[[databases]]
name = "db1"
Expand Down
12 changes: 6 additions & 6 deletions pgdog-config/src/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::time::Duration;

use crate::pooling::ConnectionRecovery;
use crate::{QueryParserEngine, QueryParserLevel};
use crate::{QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior};

use super::auth::{AuthType, PassthoughAuth};
use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy};
Expand Down Expand Up @@ -195,8 +195,8 @@ pub struct General {
#[serde(default)]
pub unique_id_min: u64,
/// System catalogs are omnisharded?
#[serde(default = "General::default_system_catalogs_omnisharded")]
pub system_catalogs_omnisharded: bool,
#[serde(default = "General::default_system_catalogs")]
pub system_catalogs: SystemCatalogsBehavior,
/// Omnisharded queries are sticky by default.
#[serde(default)]
pub omnisharded_sticky: bool,
Expand Down Expand Up @@ -268,7 +268,7 @@ impl Default for General {
lsn_check_timeout: Self::lsn_check_timeout(),
lsn_check_delay: Self::lsn_check_delay(),
unique_id_min: u64::default(),
system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(),
system_catalogs: Self::default_system_catalogs(),
omnisharded_sticky: bool::default(),
}
}
Expand Down Expand Up @@ -429,8 +429,8 @@ impl General {
Self::env_or_default("PGDOG_SHUTDOWN_TIMEOUT", 60_000)
}

fn default_system_catalogs_omnisharded() -> bool {
Self::env_or_default("PGDOG_SYSTEM_CATALOGS_OMNISHARDED", true)
fn default_system_catalogs() -> SystemCatalogsBehavior {
Self::env_enum_or_default("PGDOG_SYSTEM_CATALOGS")
}

fn default_shutdown_termination_timeout() -> Option<u64> {
Expand Down
4 changes: 3 additions & 1 deletion pgdog-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod pooling;
pub mod replication;
pub mod rewrite;
pub mod sharding;
pub mod system_catalogs;
pub mod url;
pub mod users;
pub mod util;
Expand All @@ -31,6 +32,7 @@ pub use pooling::{PoolerMode, PreparedStatements};
pub use replication::*;
pub use rewrite::{Rewrite, RewriteMode};
pub use sharding::*;
pub use system_catalogs::system_catalogs;
pub use users::{Admin, Plugin, User, Users};

use std::time::Duration;
Expand All @@ -49,7 +51,7 @@ mod test {

#[test]
fn test_max_duration() {
assert!(MAX_DURATION > Duration::from_hours(24 * 7 * 52 * 100)); // 100 years
assert!(MAX_DURATION > Duration::from_secs(24 * 7 * 52 * 100 * 3600)); // 100 years
assert_eq!(MAX_DURATION.as_millis() as i64, i64::MAX);

#[derive(Serialize)]
Expand Down
22 changes: 22 additions & 0 deletions pgdog-config/src/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::collections::{hash_map::DefaultHasher, HashSet};
use std::hash::{Hash, Hasher as StdHasher};
use std::path::PathBuf;
use std::str::FromStr;
use tracing::{info, warn};

use super::error::Error;
Expand Down Expand Up @@ -314,3 +315,24 @@ pub enum QueryParserEngine {
PgQueryProtobuf,
PgQueryRaw,
}

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum SystemCatalogsBehavior {
Omnisharded,
#[default]
OmnishardedSticky,
Sharded,
}

impl FromStr for SystemCatalogsBehavior {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s.to_lowercase().as_str() {
"omnisharded" => Self::Omnisharded,
"omnisharded_sticky" => Self::OmnishardedSticky,
"sharded" => Self::Sharded,
_ => return Err(()),
})
}
}
29 changes: 29 additions & 0 deletions pgdog-config/src/system_catalogs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use once_cell::sync::Lazy;
use std::{collections::HashSet, ops::Deref};

const CATALOGS: &[&str] = &[
"pg_class",
"pg_attribute",
"pg_attrdef",
"pg_index",
"pg_constraint",
"pg_namespace",
"pg_database",
"pg_tablespace",
"pg_type",
"pg_proc",
"pg_operator",
"pg_cast",
"pg_enum",
"pg_range",
"pg_authid",
"pg_am",
];

static SYSTEM_CATALOGS: Lazy<HashSet<&'static str>> =
Lazy::new(|| CATALOGS.into_iter().map(|s| *s).collect());

/// Get a list of system catalogs that we care about.
pub fn system_catalogs() -> &'static HashSet<&'static str> {
SYSTEM_CATALOGS.deref()
}
1 change: 1 addition & 0 deletions pgdog/src/backend/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ fn new_pool(user: &crate::config::User, config: &crate::config::Config) -> Optio
sharded_tables,
omnisharded_tables,
general.omnisharded_sticky,
general.system_catalogs,
);
let sharded_schemas = ShardedSchemas::new(sharded_schemas);

Expand Down
1 change: 1 addition & 0 deletions pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ mod test {
},
],
config.config.general.omnisharded_sticky,
config.config.general.system_catalogs,
),
sharded_schemas: ShardedSchemas::new(vec![
ShardedSchema {
Expand Down
17 changes: 15 additions & 2 deletions pgdog/src/backend/replication/sharded_tables.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Tables sharded in the database.
use pgdog_config::OmnishardedTable;
use pgdog_config::{OmnishardedTable, SystemCatalogsBehavior};

use crate::{
config::{DataType, ShardedTable},
Expand All @@ -20,6 +20,7 @@ struct Inner {
/// and list/range/hash function.
common_mapping: Option<CommonMapping>,
omnisharded_sticky: bool,
system_catalogs: SystemCatalogsBehavior,
}

#[derive(Debug)]
Expand Down Expand Up @@ -47,7 +48,12 @@ impl Default for ShardedTables {

impl From<&[ShardedTable]> for ShardedTables {
fn from(value: &[ShardedTable]) -> Self {
Self::new(value.to_vec(), vec![], false)
Self::new(
value.to_vec(),
vec![],
false,
SystemCatalogsBehavior::default(),
)
}
}

Expand All @@ -56,6 +62,7 @@ impl ShardedTables {
tables: Vec<ShardedTable>,
omnisharded_tables: Vec<OmnishardedTable>,
omnisharded_sticky: bool,
system_catalogs: SystemCatalogsBehavior,
) -> Self {
let mut common_mapping = HashSet::new();
for table in &tables {
Expand Down Expand Up @@ -85,6 +92,7 @@ impl ShardedTables {
.collect(),
common_mapping,
omnisharded_sticky,
system_catalogs,
}),
}
}
Expand All @@ -105,6 +113,11 @@ impl ShardedTables {
self.inner.omnisharded_sticky
}

/// System catalogs are to be joined across shards.
pub fn is_system_catalog_sharded(&self) -> bool {
self.inner.system_catalogs == SystemCatalogsBehavior::Sharded
}

/// The deployment has only one sharded table.
pub fn common_mapping(&self) -> &Option<CommonMapping> {
&self.inner.common_mapping
Expand Down
Loading
Loading