diff --git a/Cargo.lock b/Cargo.lock index b20554fa..780e2017 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2459,6 +2459,7 @@ dependencies = [ name = "pgdog-config" version = "0.1.0" dependencies = [ + "once_cell", "pgdog-vector", "rand 0.9.2", "serde", diff --git a/integration/pgdog.toml b/integration/pgdog.toml index f2979f96..cfd5a92d 100644 --- a/integration/pgdog.toml +++ b/integration/pgdog.toml @@ -21,6 +21,7 @@ healthcheck_port = 8080 tls_certificate = "integration/tls/cert.pem" tls_private_key = "integration/tls/key.pem" query_parser_engine = "pg_query_raw" +system_catalogs = "omnisharded_sticky" [memory] net_buffer = 8096 diff --git a/integration/schema_sync/.gitignore b/integration/schema_sync/.gitignore index 5eec9860..05554b03 100644 --- a/integration/schema_sync/.gitignore +++ b/integration/schema_sync/.gitignore @@ -1 +1,5 @@ .claude +destination.sql +source.sql +*.bak +diff.txt diff --git a/integration/schema_sync/dev.sh b/integration/schema_sync/dev.sh index 39a83f44..0092bf54 100644 --- a/integration/schema_sync/dev.sh +++ b/integration/schema_sync/dev.sh @@ -45,18 +45,31 @@ for f in source.sql destination.sql; do sed -i.bak '/^\\restrict.*$/d' $f sed -i.bak '/^\\unrestrict.*$/d' $f done -rm -f source.sql.bak destination.sql.bak - -# Verify integer primary keys are rewritten to bigint, and no other differences exist -DIFF_OUTPUT=$(diff source.sql destination.sql || true) -echo "$DIFF_OUTPUT" | grep -q 'flag_id integer NOT NULL' || { echo "Expected flag_id integer->bigint rewrite"; exit 1; } -echo "$DIFF_OUTPUT" | grep -q 'flag_id bigint NOT NULL' || { echo "Expected flag_id integer->bigint rewrite"; exit 1; } -echo "$DIFF_OUTPUT" | grep -q 'setting_id integer NOT NULL' || { echo "Expected setting_id integer->bigint rewrite"; exit 1; } -echo "$DIFF_OUTPUT" | grep -q 'setting_id bigint NOT NULL' || { echo "Expected setting_id integer->bigint rewrite"; exit 1; } -sed -i.bak 's/flag_id integer NOT NULL/flag_id bigint NOT NULL/g' source.sql -sed -i.bak 's/setting_id integer NOT NULL/setting_id bigint NOT NULL/g' source.sql -rm -f source.sql.bak -diff source.sql destination.sql -rm source.sql -rm destination.sql + +# Expected content changes (without line numbers for portability) +EXPECTED_CHANGES=$(cat < flag_id bigint NOT NULL, +< setting_id integer NOT NULL, +> setting_id bigint NOT NULL, +< override_id integer NOT NULL, +> override_id bigint NOT NULL, +< flag_id integer NOT NULL, +> flag_id bigint NOT NULL, +EOF) + +diff source.sql destination.sql > diff.txt || true + +# Extract just the content lines (< and >) for comparison +ACTUAL_CHANGES=$(grep '^[<>]' diff.txt) +if [ "$ACTUAL_CHANGES" != "$EXPECTED_CHANGES" ]; then + echo "Schema diff does not match expected changes" + echo "=== Expected ===" + echo "$EXPECTED_CHANGES" + echo "=== Actual ===" + echo "$ACTUAL_CHANGES" + exit 1 +fi + +rm source.sql destination.sql diff.txt popd diff --git a/integration/schema_sync/ecommerce_schema.sql b/integration/schema_sync/ecommerce_schema.sql index 5c1be03b..c18e5e4c 100644 --- a/integration/schema_sync/ecommerce_schema.sql +++ b/integration/schema_sync/ecommerce_schema.sql @@ -1005,3 +1005,12 @@ CREATE TABLE core.feature_flags ( flag_name VARCHAR(100) NOT NULL UNIQUE, is_enabled BOOLEAN NOT NULL DEFAULT FALSE ); + +CREATE TABLE core.user_feature_overrides ( + override_id SERIAL PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES core.users(user_id) ON DELETE CASCADE, + flag_id INTEGER NOT NULL REFERENCES core.feature_flags(flag_id) ON DELETE CASCADE, + is_enabled BOOLEAN NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(user_id, flag_id) +); diff --git a/pgdog-config/Cargo.toml b/pgdog-config/Cargo.toml index 0b34ec2c..86fedc86 100644 --- a/pgdog-config/Cargo.toml +++ b/pgdog-config/Cargo.toml @@ -13,6 +13,7 @@ url = "2" uuid = { version = "1", features = ["v4", "serde"] } rand = "*" pgdog-vector = { path = "../pgdog-vector" } +once_cell = "*" [dev-dependencies] tempfile = "3.23.0" diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index 5b680f0e..cb7fd7b0 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -6,8 +6,9 @@ use tracing::{info, warn}; use crate::sharding::ShardedSchema; use crate::{ - EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, PreparedStatements, - QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role, + system_catalogs, EnumeratedDatabase, Memory, OmnishardedTable, PassthoughAuth, + PreparedStatements, QueryParserEngine, QueryParserLevel, ReadWriteSplit, RewriteMode, Role, + SystemCatalogsBehavior, }; use super::database::Database; @@ -249,32 +250,19 @@ impl Config { // Automatically configure system catalogs // as omnisharded. - if self.general.system_catalogs_omnisharded { + if self.general.system_catalogs != SystemCatalogsBehavior::Sharded { + let sticky_routing = matches!( + self.general.system_catalogs, + SystemCatalogsBehavior::OmnishardedSticky + ); for database in databases { let entry = tables.entry(database).or_insert_with(Vec::new); - for table in [ - "pg_class", - "pg_attribute", - "pg_attrdef", - "pg_index", - "pg_constraint", - "pg_namespace", - "pg_database", - "pg_tablespace", - "pg_type", - "pg_proc", - "pg_operator", - "pg_cast", - "pg_enum", - "pg_range", - "pg_authid", - "pg_am", - ] { - if entry.iter().find(|t| t.name == table).is_none() { + for table in system_catalogs() { + if entry.iter().find(|t| t.name == *table).is_none() { entry.push(OmnishardedTable { name: table.to_string(), - sticky_routing: true, + sticky_routing, }); } } @@ -765,7 +753,7 @@ password = "users_admin_password" [general] host = "0.0.0.0" port = 6432 -system_catalogs_omnisharded = false +system_catalogs = "sharded" [[databases]] name = "db1" @@ -821,7 +809,7 @@ tables = ["table_x"] [general] host = "0.0.0.0" port = 6432 -system_catalogs_omnisharded = true +system_catalogs = "omnisharded_sticky" [[databases]] name = "db1" @@ -848,12 +836,12 @@ tables = ["my_table"] let pg_class = db1_tables.iter().find(|t| t.name == "pg_class").unwrap(); assert!(pg_class.sticky_routing); - // Test with system_catalogs_omnisharded = false + // Test with system_catalogs = "sharded" (no omnisharding) let source_disabled = r#" [general] host = "0.0.0.0" port = 6432 -system_catalogs_omnisharded = false +system_catalogs = "sharded" [[databases]] name = "db1" diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index 725f11d3..eca4846f 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use std::time::Duration; use crate::pooling::ConnectionRecovery; -use crate::{QueryParserEngine, QueryParserLevel}; +use crate::{QueryParserEngine, QueryParserLevel, SystemCatalogsBehavior}; use super::auth::{AuthType, PassthoughAuth}; use super::database::{LoadBalancingStrategy, ReadWriteSplit, ReadWriteStrategy}; @@ -195,8 +195,8 @@ pub struct General { #[serde(default)] pub unique_id_min: u64, /// System catalogs are omnisharded? - #[serde(default = "General::default_system_catalogs_omnisharded")] - pub system_catalogs_omnisharded: bool, + #[serde(default = "General::default_system_catalogs")] + pub system_catalogs: SystemCatalogsBehavior, /// Omnisharded queries are sticky by default. #[serde(default)] pub omnisharded_sticky: bool, @@ -268,7 +268,7 @@ impl Default for General { lsn_check_timeout: Self::lsn_check_timeout(), lsn_check_delay: Self::lsn_check_delay(), unique_id_min: u64::default(), - system_catalogs_omnisharded: Self::default_system_catalogs_omnisharded(), + system_catalogs: Self::default_system_catalogs(), omnisharded_sticky: bool::default(), } } @@ -429,8 +429,8 @@ impl General { Self::env_or_default("PGDOG_SHUTDOWN_TIMEOUT", 60_000) } - fn default_system_catalogs_omnisharded() -> bool { - Self::env_or_default("PGDOG_SYSTEM_CATALOGS_OMNISHARDED", true) + fn default_system_catalogs() -> SystemCatalogsBehavior { + Self::env_enum_or_default("PGDOG_SYSTEM_CATALOGS") } fn default_shutdown_termination_timeout() -> Option { diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 560c5fca..a08b302c 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -12,6 +12,7 @@ pub mod pooling; pub mod replication; pub mod rewrite; pub mod sharding; +pub mod system_catalogs; pub mod url; pub mod users; pub mod util; @@ -31,6 +32,7 @@ pub use pooling::{PoolerMode, PreparedStatements}; pub use replication::*; pub use rewrite::{Rewrite, RewriteMode}; pub use sharding::*; +pub use system_catalogs::system_catalogs; pub use users::{Admin, Plugin, User, Users}; use std::time::Duration; @@ -49,7 +51,7 @@ mod test { #[test] fn test_max_duration() { - assert!(MAX_DURATION > Duration::from_hours(24 * 7 * 52 * 100)); // 100 years + assert!(MAX_DURATION > Duration::from_secs(24 * 7 * 52 * 100 * 3600)); // 100 years assert_eq!(MAX_DURATION.as_millis() as i64, i64::MAX); #[derive(Serialize)] diff --git a/pgdog-config/src/sharding.rs b/pgdog-config/src/sharding.rs index 4dd177e4..825e4dc4 100644 --- a/pgdog-config/src/sharding.rs +++ b/pgdog-config/src/sharding.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::collections::{hash_map::DefaultHasher, HashSet}; use std::hash::{Hash, Hasher as StdHasher}; use std::path::PathBuf; +use std::str::FromStr; use tracing::{info, warn}; use super::error::Error; @@ -314,3 +315,24 @@ pub enum QueryParserEngine { PgQueryProtobuf, PgQueryRaw, } + +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, Hash, Default)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum SystemCatalogsBehavior { + Omnisharded, + #[default] + OmnishardedSticky, + Sharded, +} + +impl FromStr for SystemCatalogsBehavior { + type Err = (); + fn from_str(s: &str) -> Result { + Ok(match s.to_lowercase().as_str() { + "omnisharded" => Self::Omnisharded, + "omnisharded_sticky" => Self::OmnishardedSticky, + "sharded" => Self::Sharded, + _ => return Err(()), + }) + } +} diff --git a/pgdog-config/src/system_catalogs.rs b/pgdog-config/src/system_catalogs.rs new file mode 100644 index 00000000..50c1b4b7 --- /dev/null +++ b/pgdog-config/src/system_catalogs.rs @@ -0,0 +1,29 @@ +use once_cell::sync::Lazy; +use std::{collections::HashSet, ops::Deref}; + +const CATALOGS: &[&str] = &[ + "pg_class", + "pg_attribute", + "pg_attrdef", + "pg_index", + "pg_constraint", + "pg_namespace", + "pg_database", + "pg_tablespace", + "pg_type", + "pg_proc", + "pg_operator", + "pg_cast", + "pg_enum", + "pg_range", + "pg_authid", + "pg_am", +]; + +static SYSTEM_CATALOGS: Lazy> = + Lazy::new(|| CATALOGS.into_iter().map(|s| *s).collect()); + +/// Get a list of system catalogs that we care about. +pub fn system_catalogs() -> &'static HashSet<&'static str> { + SYSTEM_CATALOGS.deref() +} diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 980c3386..d8028c35 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -446,6 +446,7 @@ fn new_pool(user: &crate::config::User, config: &crate::config::Config) -> Optio sharded_tables, omnisharded_tables, general.omnisharded_sticky, + general.system_catalogs, ); let sharded_schemas = ShardedSchemas::new(sharded_schemas); diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 80e81498..20cf477e 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -667,6 +667,7 @@ mod test { }, ], config.config.general.omnisharded_sticky, + config.config.general.system_catalogs, ), sharded_schemas: ShardedSchemas::new(vec![ ShardedSchema { diff --git a/pgdog/src/backend/replication/sharded_tables.rs b/pgdog/src/backend/replication/sharded_tables.rs index 7cfea687..75547c0c 100644 --- a/pgdog/src/backend/replication/sharded_tables.rs +++ b/pgdog/src/backend/replication/sharded_tables.rs @@ -1,5 +1,5 @@ //! Tables sharded in the database. -use pgdog_config::OmnishardedTable; +use pgdog_config::{OmnishardedTable, SystemCatalogsBehavior}; use crate::{ config::{DataType, ShardedTable}, @@ -20,6 +20,7 @@ struct Inner { /// and list/range/hash function. common_mapping: Option, omnisharded_sticky: bool, + system_catalogs: SystemCatalogsBehavior, } #[derive(Debug)] @@ -47,7 +48,12 @@ impl Default for ShardedTables { impl From<&[ShardedTable]> for ShardedTables { fn from(value: &[ShardedTable]) -> Self { - Self::new(value.to_vec(), vec![], false) + Self::new( + value.to_vec(), + vec![], + false, + SystemCatalogsBehavior::default(), + ) } } @@ -56,6 +62,7 @@ impl ShardedTables { tables: Vec, omnisharded_tables: Vec, omnisharded_sticky: bool, + system_catalogs: SystemCatalogsBehavior, ) -> Self { let mut common_mapping = HashSet::new(); for table in &tables { @@ -85,6 +92,7 @@ impl ShardedTables { .collect(), common_mapping, omnisharded_sticky, + system_catalogs, }), } } @@ -105,6 +113,11 @@ impl ShardedTables { self.inner.omnisharded_sticky } + /// System catalogs are to be joined across shards. + pub fn is_system_catalog_sharded(&self) -> bool { + self.inner.system_catalogs == SystemCatalogsBehavior::Sharded + } + /// The deployment has only one sharded table. pub fn common_mapping(&self) -> &Option { &self.inner.common_mapping diff --git a/pgdog/src/backend/schema/sync/pg_dump.rs b/pgdog/src/backend/schema/sync/pg_dump.rs index ec1bd703..6c4bbc95 100644 --- a/pgdog/src/backend/schema/sync/pg_dump.rs +++ b/pgdog/src/backend/schema/sync/pg_dump.rs @@ -9,8 +9,8 @@ use std::{ use lazy_static::lazy_static; use pg_query::{ protobuf::{ - AlterTableCmd, AlterTableStmt, AlterTableType, ColumnDef, ConstrType, ObjectType, - ParseResult, RangeVar, String as PgString, TypeName, + AlterTableCmd, AlterTableStmt, AlterTableType, ColumnDef, ConstrType, Constraint, + ObjectType, ParseResult, RangeVar, String as PgString, TypeName, }, Node, NodeEnum, }; @@ -47,6 +47,92 @@ fn parse(query: &str) -> Result { } } +fn schema_name(relation: &RangeVar) -> &str { + if relation.schemaname.is_empty() { + "public" + } else { + relation.schemaname.as_str() + } +} + +/// Track primary key columns that are INTEGER types. +fn track_primary_keys<'a>( + cons: &'a Constraint, + table: &'a RangeVar, + column_types: &HashMap, &'a str>, + integer_primary_keys: &mut HashSet>, +) { + let schema = schema_name(table); + let table_name = table.relname.as_str(); + + for key in &cons.keys { + let Some(NodeEnum::String(PgString { sval })) = &key.node else { + continue; + }; + + let col_name = sval.as_str(); + let key = ColumnTypeKey { + schema, + table: table_name, + column: col_name, + }; + + if let Some(&type_name) = column_types.get(&key) { + if matches!( + type_name, + "int4" | "int2" | "serial" | "smallserial" | "integer" | "smallint" + ) { + integer_primary_keys.insert(Column { + name: col_name, + table: Some(table_name), + schema: Some(schema), + }); + } + } + } +} + +/// Track foreign key columns that reference integer primary keys. +fn track_foreign_keys<'a>( + cons: &'a Constraint, + fk_table: &'a RangeVar, + integer_primary_keys: &HashSet>, + integer_foreign_keys: &mut HashSet>, +) { + let Some(ref pk_table) = cons.pktable else { + return; + }; + + let pk_schema = schema_name(pk_table); + let pk_table_name = pk_table.relname.as_str(); + let fk_schema = schema_name(fk_table); + let fk_table_name = fk_table.relname.as_str(); + + for (pk_attr, fk_attr) in cons.pk_attrs.iter().zip(cons.fk_attrs.iter()) { + let ( + Some(NodeEnum::String(PgString { sval: pk_col })), + Some(NodeEnum::String(PgString { sval: fk_col })), + ) = (&pk_attr.node, &fk_attr.node) + else { + continue; + }; + + let pk_column = Column { + name: pk_col.as_str(), + table: Some(pk_table_name), + schema: Some(pk_schema), + }; + + if integer_primary_keys.contains(&pk_column) { + integer_foreign_keys.insert(Column { + name: fk_col.as_str(), + table: Some(fk_table_name), + schema: Some(fk_schema), + }); + } + } +} + use tokio::process::Command; #[derive(Debug, Clone)] @@ -236,6 +322,7 @@ impl PgDumpOutput { pub fn statements(&self, state: SyncState) -> Result>, Error> { let mut result = vec![]; let mut integer_primary_keys = HashSet::>::new(); + let mut integer_foreign_keys = HashSet::>::new(); let mut column_types: HashMap, &str> = HashMap::new(); for stmt in &self.stmts.stmts { @@ -341,58 +428,18 @@ impl PgDumpOutput { | ConstrType::ConstrNotnull | ConstrType::ConstrNull ) { - // Track INTEGER primary keys if cons.contype() == ConstrType::ConstrPrimary { if let Some(ref relation) = stmt.relation { - let schema = if relation - .schemaname - .is_empty() - { - "public" - } else { - relation.schemaname.as_str() - }; - let table_name = - relation.relname.as_str(); - - for key in &cons.keys { - if let Some(NodeEnum::String( - PgString { sval }, - )) = &key.node - { - let col_name = - sval.as_str(); - let key = ColumnTypeKey { - schema, - table: table_name, - column: col_name, - }; - if let Some(&type_name) = - column_types.get(&key) - { - // Check for INTEGER types: int4, int2, serial, smallserial - if matches!( - type_name, - "int4" - | "int2" - | "serial" - | "smallserial" - | "integer" - | "smallint" - ) { - integer_primary_keys.insert(Column { - name: col_name, - table: Some(table_name), - schema: Some(schema), - }); - } - } - } - } + track_primary_keys( + cons, + relation, + &column_types, + &mut integer_primary_keys, + ); } } @@ -402,6 +449,24 @@ impl PgDumpOutput { idempotent: false, }); } + } else if cons.contype() + == ConstrType::ConstrForeign + { + if let Some(ref relation) = stmt.relation { + track_foreign_keys( + cons, + relation, + &integer_primary_keys, + &mut integer_foreign_keys, + ); + } + + if state == SyncState::PostData { + result.push(Statement::Other { + sql: original.to_string(), + idempotent: false, + }); + } } else if state == SyncState::PostData { result.push(Statement::Other { sql: original.to_string(), @@ -650,9 +715,12 @@ impl PgDumpOutput { } } - // Convert INTEGER primary keys to BIGINT + // Convert INTEGER primary keys and their referencing foreign keys to BIGINT if state == SyncState::PreData { - for column in &integer_primary_keys { + for column in integer_primary_keys + .iter() + .chain(integer_foreign_keys.iter()) + { let alter_stmt = AlterTableStmt { relation: Some(RangeVar { schemaname: column.schema.unwrap_or("public").to_owned(), @@ -892,4 +960,42 @@ ALTER TABLE test ADD CONSTRAINT id_pkey PRIMARY KEY (id);"#; "ALTER TABLE public.test ALTER COLUMN id TYPE bigint" ); } + + #[test] + fn test_bigint_rewrite_foreign_key() { + let query = r#" +CREATE TABLE parent (id INTEGER, name TEXT); +CREATE TABLE child (id INTEGER, parent_id INTEGER); +ALTER TABLE parent ADD CONSTRAINT parent_pkey PRIMARY KEY (id); +ALTER TABLE child ADD CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFERENCES parent(id);"#; + + let output = PgDumpOutput { + stmts: parse(query).unwrap().protobuf, + original: query.to_owned(), + }; + + let statements = output.statements(SyncState::PreData).unwrap(); + assert_eq!(statements.len(), 5); + + assert_eq!( + statements[0].deref(), + "CREATE TABLE IF NOT EXISTS parent (id int, name text)" + ); + assert_eq!( + statements[1].deref(), + "CREATE TABLE IF NOT EXISTS child (id int, parent_id int)" + ); + assert_eq!( + statements[2].deref(), + "\nALTER TABLE parent ADD CONSTRAINT parent_pkey PRIMARY KEY (id)" + ); + assert_eq!( + statements[3].deref(), + "ALTER TABLE public.parent ALTER COLUMN id TYPE bigint" + ); + assert_eq!( + statements[4].deref(), + "ALTER TABLE public.child ALTER COLUMN parent_id TYPE bigint" + ); + } } diff --git a/pgdog/src/frontend/router/parser/comment.rs b/pgdog/src/frontend/router/parser/comment.rs index 171a00d0..a87883ad 100644 --- a/pgdog/src/frontend/router/parser/comment.rs +++ b/pgdog/src/frontend/router/parser/comment.rs @@ -88,6 +88,8 @@ pub fn comment( #[cfg(test)] mod tests { + use pgdog_config::SystemCatalogsBehavior; + use super::*; #[test] @@ -159,7 +161,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![], false), + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), ..Default::default() }; @@ -174,7 +176,7 @@ mod tests { let schema = ShardingSchema { shards: 3, - tables: ShardedTables::new(vec![], vec![], false), + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), ..Default::default() }; @@ -190,7 +192,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![], false), + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), ..Default::default() }; @@ -205,7 +207,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![], false), + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), ..Default::default() }; @@ -220,7 +222,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![], false), + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), ..Default::default() }; @@ -244,7 +246,7 @@ mod tests { let schema = ShardingSchema { shards: 2, - tables: ShardedTables::new(vec![], vec![], false), + tables: ShardedTables::new(vec![], vec![], false, SystemCatalogsBehavior::default()), schemas: ShardedSchemas::new(vec![sales_schema]), ..Default::default() }; diff --git a/pgdog/src/frontend/router/parser/query/select.rs b/pgdog/src/frontend/router/parser/query/select.rs index cf054b59..56770175 100644 --- a/pgdog/src/frontend/router/parser/query/select.rs +++ b/pgdog/src/frontend/router/parser/query/select.rs @@ -3,6 +3,7 @@ use crate::frontend::router::parser::{ }; use super::*; +use pgdog_config::system_catalogs; use shared::ConvergeAlgorithm; impl QueryParser { @@ -131,40 +132,60 @@ impl QueryParser { .shards_calculator .push(ShardWithPriority::new_table(Shard::All)); } else { - debug!( - "table is not sharded, defaulting to omnisharded (schema loaded: {})", - context.router_context.schema.is_loaded() - ); + let system_catalog_sharded = context + .sharding_schema + .tables() + .is_system_catalog_sharded() + .then(|| { + tables + .iter() + .any(|table| system_catalogs().contains(&table.name)) + }) + .unwrap_or_default(); + + if system_catalog_sharded { + debug!("system catalog sharded"); - // Omnisharded by default. - let sticky = tables.iter().any(|table| { context - .sharding_schema - .tables() - .is_omnisharded_sticky(table.name) - == Some(true) - }); - - let (rr_index, explain) = if sticky - || context - .sharding_schema - .tables() - .is_omnisharded_sticky_default() - { - (context.router_context.sticky.omni_index, "sticky") + .shards_calculator + .push(ShardWithPriority::new_table(Shard::All)); } else { - (round_robin::next(), "round robin") - }; + debug!( + "table is not sharded, defaulting to omnisharded (schema loaded: {})", + context.router_context.schema.is_loaded() + ); + + // Omnisharded by default. + let sticky = tables.iter().any(|table| { + context + .sharding_schema + .tables() + .is_omnisharded_sticky(table.name) + == Some(true) + }); + + let (rr_index, explain) = if sticky + || context + .sharding_schema + .tables() + .is_omnisharded_sticky_default() + { + (context.router_context.sticky.omni_index, "sticky") + } else { + (round_robin::next(), "round robin") + }; - let shard = Shard::Direct(rr_index % context.shards); + let shard = Shard::Direct(rr_index % context.shards); - if let Some(recorder) = self.recorder_mut() { - recorder.record_entry(Some(shard.clone()), format!("SELECT omnishard {}", explain)); - } + if let Some(recorder) = self.recorder_mut() { + recorder + .record_entry(Some(shard.clone()), format!("SELECT omnishard {}", explain)); + } - context - .shards_calculator - .push(ShardWithPriority::new_rr_omni(shard)); + context + .shards_calculator + .push(ShardWithPriority::new_rr_omni(shard)); + } } let mut query = Route::select( diff --git a/pgdog/src/frontend/router/parser/query/test/test_select.rs b/pgdog/src/frontend/router/parser/query/test/test_select.rs index 3ef4f446..fedc73ca 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_select.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_select.rs @@ -203,3 +203,60 @@ fn test_omnisharded_sticky_config_disabled() { "with omnisharded_sticky=false, queries should be load-balanced across shards" ); } + +#[test] +fn test_system_catalog_sharded() { + use pgdog_config::SystemCatalogsBehavior; + + let mut updated = config().deref().clone(); + updated.config.general.system_catalogs = SystemCatalogsBehavior::Sharded; + config::set(updated).unwrap(); + + let mut test = QueryParserTest::new_with_config(&config()); + + let command = test.execute(vec![Query::new("SELECT * FROM pg_class").into()]); + assert_eq!( + command.route().shard(), + &Shard::All, + "system catalog query with Sharded behavior should go to all shards" + ); + + let command = test.execute(vec![Query::new( + "SELECT * FROM pg_type WHERE typname = 'int4'", + ) + .into()]); + assert_eq!( + command.route().shard(), + &Shard::All, + "system catalog query with WHERE clause should still go to all shards" + ); + + // Reset to default + let mut updated = config().deref().clone(); + updated.config.general.system_catalogs = SystemCatalogsBehavior::default(); + config::set(updated).unwrap(); +} + +#[test] +fn test_system_catalog_omnisharded_default() { + use pgdog_config::SystemCatalogsBehavior; + + let mut updated = config().deref().clone(); + updated.config.general.system_catalogs = SystemCatalogsBehavior::OmnishardedSticky; + config::set(updated).unwrap(); + + let mut test = QueryParserTest::new_with_config(&config()); + + // Without Sharded mode, system catalog queries use omnisharded routing + // (goes to a single shard, not all shards) + let command = test.execute(vec![Query::new("SELECT * FROM pg_class").into()]); + assert!( + matches!(command.route().shard(), Shard::Direct(_)), + "system catalog query with OmnishardedSticky should go to a single shard, not Shard::All" + ); + + // Reset to default + let mut updated = config().deref().clone(); + updated.config.general.system_catalogs = SystemCatalogsBehavior::default(); + config::set(updated).unwrap(); +} diff --git a/pgdog/src/frontend/router/parser/rewrite/statement/update.rs b/pgdog/src/frontend/router/parser/rewrite/statement/update.rs index 79f0c3de..93cee826 100644 --- a/pgdog/src/frontend/router/parser/rewrite/statement/update.rs +++ b/pgdog/src/frontend/router/parser/rewrite/statement/update.rs @@ -679,6 +679,7 @@ mod test { }], vec![], false, + pgdog_config::SystemCatalogsBehavior::default(), ), schemas: ShardedSchemas::new(vec![]), rewrite: Rewrite { diff --git a/pgdog/src/frontend/router/parser/statement.rs b/pgdog/src/frontend/router/parser/statement.rs index 17bf9a14..ac31fd2d 100644 --- a/pgdog/src/frontend/router/parser/statement.rs +++ b/pgdog/src/frontend/router/parser/statement.rs @@ -1248,7 +1248,10 @@ impl<'a, 'b, 'c> StatementParser<'a, 'b, 'c> { #[cfg(test)] mod test { - use pgdog_config::{FlexibleType, Mapping, ShardedMapping, ShardedMappingKind, ShardedTable}; + use pgdog_config::{ + FlexibleType, Mapping, ShardedMapping, ShardedMappingKind, ShardedTable, + SystemCatalogsBehavior, + }; use crate::backend::ShardedTables; use crate::net::messages::{Bind, Parameter}; @@ -1290,6 +1293,7 @@ mod test { ], vec![], false, + SystemCatalogsBehavior::default(), ), ..Default::default() }; @@ -2097,6 +2101,7 @@ mod test { }], vec![], false, + SystemCatalogsBehavior::default(), ), schemas: ShardedSchemas::new(vec![ ShardedSchema { @@ -2227,6 +2232,7 @@ mod test { }], vec![], false, + SystemCatalogsBehavior::default(), ), ..Default::default() }; @@ -2377,6 +2383,7 @@ mod test { }], vec![], false, + SystemCatalogsBehavior::default(), ), ..Default::default() }; diff --git a/pgdog/src/frontend/router/sharding/context_builder.rs b/pgdog/src/frontend/router/sharding/context_builder.rs index 1f398f27..11207255 100644 --- a/pgdog/src/frontend/router/sharding/context_builder.rs +++ b/pgdog/src/frontend/router/sharding/context_builder.rs @@ -192,6 +192,8 @@ impl<'a> ContextBuilder<'a> { #[cfg(test)] mod test { + use pgdog_config::SystemCatalogsBehavior; + use crate::{ backend::ShardedTables, config::{FlexibleType, ShardedMapping, ShardedMappingKind}, @@ -212,6 +214,7 @@ mod test { }], vec![], false, + SystemCatalogsBehavior::default(), ), ..Default::default() }; @@ -243,6 +246,7 @@ mod test { }], vec![], false, + SystemCatalogsBehavior::default(), ), ..Default::default() }; @@ -275,6 +279,7 @@ mod test { }], vec![], false, + SystemCatalogsBehavior::default(), ), ..Default::default() };