From dbddb6549af56209a2f1e245d39fe84f14bf6998 Mon Sep 17 00:00:00 2001 From: timeless Date: Mon, 1 Dec 2025 15:52:23 -0600 Subject: [PATCH 01/11] get all commits from our ext repo --- lib/pgslice.rb | 1 + lib/pgslice/cli.rb | 22 +++ lib/pgslice/cli/fill.rb | 96 ++++++++--- lib/pgslice/cli/synchronize.rb | 300 +++++++++++++++++++++++++++++++++ lib/pgslice/helpers.rb | 211 +++++++++++++++++++++-- lib/pgslice/table.rb | 32 +++- 6 files changed, 630 insertions(+), 32 deletions(-) create mode 100644 lib/pgslice/cli/synchronize.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index a8bf0cf..4732176 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,3 +21,4 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" +require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli.rb b/lib/pgslice/cli.rb index ff22dc0..29b708a 100644 --- a/lib/pgslice/cli.rb +++ b/lib/pgslice/cli.rb @@ -26,5 +26,27 @@ def initialize(*args) def version log("pgslice #{PgSlice::VERSION}") end + + desc "enable_mirroring TABLE", "Enable mirroring triggers for live data changes during partitioning" + def enable_mirroring(table_name) + table = create_table(table_name) + intermediate_table = table.intermediate_table + + assert_table(table) + assert_table(intermediate_table) + + enable_mirroring_triggers(table) + log("Mirroring triggers enabled for #{table_name}") + end + + desc "disable_mirroring TABLE", "Disable mirroring triggers after partitioning is complete" + def disable_mirroring(table_name) + table = create_table(table_name) + + assert_table(table) + + disable_mirroring_triggers(table) + log("Mirroring triggers disabled for #{table_name}") + end end end diff --git a/lib/pgslice/cli/fill.rb b/lib/pgslice/cli/fill.rb index 64a3f53..a4a85c0 100644 --- a/lib/pgslice/cli/fill.rb +++ b/lib/pgslice/cli/fill.rb @@ -5,7 +5,7 @@ class CLI option :swapped, type: :boolean, default: false, desc: "Use swapped table" option :source_table, desc: "Source table" option :dest_table, desc: "Destination table" - option :start, type: :numeric, desc: "Primary key to start" + option :start, type: :string, desc: "Primary key to start (numeric or ULID)" option :where, desc: "Conditions to filter" option :sleep, type: :numeric, desc: "Seconds to sleep between batches" def fill(table) @@ -45,21 +45,44 @@ def fill(table) begin max_source_id = source_table.max_id(primary_key) rescue PG::UndefinedFunction - abort "Only numeric primary keys are supported" + abort "Only numeric and ULID primary keys are supported" end max_dest_id = if options[:start] - options[:start] + # Convert to appropriate type + start_val = options[:start] + numeric_id?(start_val) ? start_val.to_i : start_val elsif options[:swapped] dest_table.max_id(primary_key, where: options[:where], below: max_source_id) else dest_table.max_id(primary_key, where: options[:where]) end - if max_dest_id == 0 && !options[:swapped] + # Get the appropriate handler for the ID type + # Prefer --start option, then max_source_id, then sample from table + handler = if options[:start] + id_handler(options[:start], connection, source_table, primary_key) + elsif max_source_id + id_handler(max_source_id, connection, source_table, primary_key) + else + # Sample a row to determine ID type + sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(source_table)} LIMIT 1" + log_sql sample_query + sample_result = execute(sample_query)[0] + if sample_result && sample_result[primary_key] + id_handler(sample_result[primary_key], connection, source_table, primary_key) + else + # Default to numeric if we can't determine + Helpers::NumericHandler.new + end + end + + if (max_dest_id == 0 || max_dest_id == handler.min_value) && !options[:swapped] min_source_id = source_table.min_id(primary_key, field, cast, starting_time, options[:where]) - max_dest_id = min_source_id - 1 if min_source_id + if min_source_id + max_dest_id = handler.predecessor(min_source_id) + end end starting_id = max_dest_id @@ -67,14 +90,15 @@ def fill(table) batch_size = options[:batch_size] i = 1 - batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil + batch_count = handler.batch_count(starting_id, max_source_id, batch_size) + first_batch = true if batch_count == 0 log_sql "/* nothing to fill */" end - while starting_id < max_source_id - where = "#{quote_ident(primary_key)} > #{quote(starting_id)} AND #{quote_ident(primary_key)} <= #{quote(starting_id + batch_size)}" + while handler.should_continue?(starting_id, max_source_id) + where = handler.batch_where_condition(primary_key, starting_id, batch_size, first_batch && options[:start]) if starting_time where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end @@ -82,19 +106,53 @@ def fill(table) where << " AND #{options[:where]}" end - query = <<~SQL - /* #{i} of #{batch_count} */ - INSERT INTO #{quote_table(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_table(source_table)} - WHERE #{where} - SQL - - run_query(query) - - starting_id += batch_size + batch_label = batch_count ? "#{i} of #{batch_count}" : "batch #{i}" + + if handler.is_a?(UlidHandler) + # For ULIDs, use CTE with RETURNING to get max ID inserted + query = <<~SQL + /* #{batch_label} */ + WITH inserted_batch AS ( + INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} + WHERE #{where} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{batch_size} + ON CONFLICT DO NOTHING + RETURNING #{quote_ident(primary_key)} + ) + SELECT MAX(#{quote_ident(primary_key)}) as max_inserted_id FROM inserted_batch + SQL + + log_sql query + result = execute(query) + max_inserted_id = result[0]["max_inserted_id"] + puts "starting_id: #{starting_id}" + puts "max_inserted_id: #{max_inserted_id}" + + # If no records were inserted, break the loop + if max_inserted_id.nil? + break + end + + starting_id = max_inserted_id + else + query = <<~SQL + /* #{batch_label} */ + INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} + WHERE #{where} + ON CONFLICT DO NOTHING + SQL + + run_query(query) + starting_id = handler.next_starting_id(starting_id, batch_size) + end + i += 1 + first_batch = false - if options[:sleep] && starting_id <= max_source_id + if options[:sleep] && handler.should_continue?(starting_id, max_source_id) sleep(options[:sleep]) end end diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb new file mode 100644 index 0000000..a99db60 --- /dev/null +++ b/lib/pgslice/cli/synchronize.rb @@ -0,0 +1,300 @@ +module PgSlice + class CLI + desc "synchronize TABLE", "Synchronize data between two tables" + option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" + option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" + option :primary_key, type: :string, desc: "Primary key column name" + option :start, type: :string, desc: "Primary key value to start synchronization at" + option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" + option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" + option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" + option :read_only, type: :boolean, default: false, desc: "Log SQL statements instead of executing them" + def synchronize(table_name) + table = create_table(table_name) + + # Determine source and target tables + source_table = options[:source_table] ? create_table(options[:source_table]) : table + target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table + + # Verify both tables exist + assert_table(source_table) + assert_table(target_table) + + # Get and verify schemas match + source_schema = get_table_schema(source_table) + target_schema = get_table_schema(target_table) + verify_schemas_match(source_table, target_table, source_schema, target_schema) + + # Get primary key + primary_key = options[:primary_key] || source_table.primary_key&.first + abort "Primary key not found. Specify with --primary-key" unless primary_key + abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] + + # Determine starting value + starting_id = options[:start] + unless starting_id + starting_id = get_min_id(source_table, primary_key) + abort "No rows found in source table" unless starting_id + end + + # Get parameters + window_size = options[:window_size] + base_delay = options[:delay] + delay_multiplier = options[:delay_multiplier] + read_only = options[:read_only] + + log "Synchronizing #{source_table} to #{target_table}" + log "Mode: #{read_only ? 'READ-ONLY (logging only)' : 'WRITE (executing changes)'}" + log "Primary key: #{primary_key}" + log "Starting at: #{starting_id}" + log "Window size: #{window_size}" + log "Base delay: #{base_delay}s" + log "Delay multiplier: #{delay_multiplier}" + log + + # Statistics + stats = { + total_rows: 0, + matching_rows: 0, + rows_with_differences: 0, + missing_rows: 0, + extra_rows: 0, + batches: 0 + } + + columns = source_schema.keys + + # Main synchronization loop + first_batch = true + loop do + batch_start_time = Time.now + + # Fetch batch from source + source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) + break if source_rows.empty? + + stats[:batches] += 1 + first_batch = false + stats[:total_rows] += source_rows.size + + # Get primary keys and range from source batch + source_pks = source_rows.map { |row| row[primary_key] } + first_source_pk = source_rows.first[primary_key] + last_source_pk = source_rows.last[primary_key] + + # Fetch corresponding rows from target using range query to catch deletions + target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) + target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } + + # Compare and generate fix queries + fix_queries = [] + + source_rows.each do |source_row| + pk_value = source_row[primary_key] + target_row = target_rows_by_pk[pk_value] + + if target_row.nil? + # Missing row in target + stats[:missing_rows] += 1 + fix_queries << generate_insert(target_table, source_row, columns) + elsif rows_differ?(source_row, target_row, columns) + # Rows differ + stats[:rows_with_differences] += 1 + fix_queries << generate_update(target_table, primary_key, source_row, columns) + else + # Rows match + stats[:matching_rows] += 1 + end + end + + # Check for extra rows in target (rows in target but not in source batch) + # Note: This only checks within the current batch window + extra_pks = target_rows_by_pk.keys - source_pks + extra_pks.each do |pk_value| + stats[:extra_rows] += 1 + fix_queries << generate_delete(target_table, primary_key, pk_value) + end + + # Get first and last primary key for logging + first_pk = source_rows.first[primary_key] + last_pk = source_rows.last[primary_key] + pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" + + # Execute or log fix queries + if fix_queries.any? + log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" + if read_only + log_sql "-- Read-only mode: logging statements (not executing)" + fix_queries.each { |query| log_sql query } + log_sql + else + # In write mode, log truncated SQL and execute without auto-logging + fix_queries.each { |query| log_sql truncate_sql_for_log(query) } + run_queries(fix_queries, silent: true) + end + else + log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" + end + + # Update starting_id for next batch (use > not >=) + starting_id = source_rows.last[primary_key] + + # Calculate adaptive delay: M + N*P + batch_duration = Time.now - batch_start_time + sleep_time = base_delay + (batch_duration * delay_multiplier) + if sleep_time > 0 + log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" + sleep(sleep_time) + end + + # Break if we processed fewer rows than window size (last batch) + break if source_rows.size < window_size + end + + # Print summary + log + log "Synchronization complete" + log "=" * 50 + log "Total batches: #{stats[:batches]}" + log "Total rows compared: #{stats[:total_rows]}" + log "Matching rows: #{stats[:matching_rows]}" + log "Rows with differences: #{stats[:rows_with_differences]}" + log "Missing rows: #{stats[:missing_rows]}" + log "Extra rows: #{stats[:extra_rows]}" + end + + private + + def log_with_timestamp(message) + timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") + log "[#{timestamp}] #{message}" + end + + def get_table_schema(table) + query = <<~SQL + SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' + ORDER BY ordinal_position + SQL + rows = execute(query, [table.schema, table.name]) + rows.each_with_object({}) do |row, hash| + hash[row["column_name"]] = { + data_type: row["data_type"], + character_maximum_length: row["character_maximum_length"], + numeric_precision: row["numeric_precision"], + numeric_scale: row["numeric_scale"] + } + end + end + + def verify_schemas_match(source_table, target_table, source_schema, target_schema) + source_schema.each do |col_name, col_spec| + target_spec = target_schema[col_name] + abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec + + if col_spec[:data_type] != target_spec[:data_type] + abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" + end + end + + target_schema.each do |col_name, _| + abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] + end + end + + def get_min_id(table, primary_key) + query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" + result = execute(query) + result.first&.values&.first + end + + def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Use >= for first batch to include starting_id, > for subsequent batches + operator = first_batch ? ">=" : ">" + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{limit.to_i} + SQL + execute(query) + end + + def fetch_rows_by_pks(table, primary_key, pk_values, columns) + return [] if pk_values.empty? + + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Build IN clause with proper quoting + pk_list = pk_values.map { |pk| quote(pk) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} IN (#{pk_list}) + SQL + execute(query) + end + + def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} + AND #{quote_ident(primary_key)} <= #{quote(last_pk)} + ORDER BY #{quote_ident(primary_key)} + SQL + execute(query) + end + + def rows_differ?(source_row, target_row, columns) + columns.any? { |col| source_row[col] != target_row[col] } + end + + def generate_insert(table, row, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + value_list = columns.map { |c| quote(row[c]) }.join(", ") + "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" + end + + def generate_update(table, primary_key, row, columns) + set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") + "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" + end + + def generate_delete(table, primary_key, pk_value) + "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" + end + + def truncate_sql_for_log(sql) + # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" + if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i + table_part = $1 + values_part = $2 + preview = values_part[0, 20] + return "#{table_part}... VALUES(#{preview}...[truncated]" + end + + # For UPDATE statements: show "UPDATE table... SET...[truncated]" + if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i + table_part = $1 + set_part = $2 + preview = set_part[0, 20] + return "#{table_part}... SET #{preview}...[truncated]" + end + + # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" + if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i + table_part = $1 + where_part = $2 + preview = where_part[0, 20] + return "#{table_part}... WHERE #{preview}...[truncated]" + end + + # Fallback: just show first 50 chars + sql[0, 50] + "...[truncated]" + end + end +end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index ee2b1b3..cae872a 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,6 +6,9 @@ module Helpers year: "YYYY" } + # ULID epoch start corresponding to 01/01/1970 + DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" + protected # output @@ -61,18 +64,20 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries) + def run_queries(queries, silent: false) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - log_sql "BEGIN;" - log_sql - run_queries_without_transaction(queries) - log_sql "COMMIT;" + unless silent + log_sql "BEGIN;" + log_sql + end + run_queries_without_transaction(queries, silent: silent) + log_sql "COMMIT;" unless silent end end - def run_query(query) - log_sql query + def run_query(query, silent: false) + log_sql query unless silent unless options[:dry_run] begin execute(query) @@ -80,12 +85,12 @@ def run_query(query) abort "#{e.class.name}: #{e.message}" end end - log_sql + log_sql unless silent end - def run_queries_without_transaction(queries) + def run_queries_without_transaction(queries, silent: false) queries.each do |query| - run_query(query) + run_query(query, silent: silent) end end @@ -167,7 +172,9 @@ def quote_ident(value) end def quote(value) - if value.is_a?(Numeric) + if value.nil? + "NULL" + elsif value.is_a?(Numeric) value else connection.escape_literal(value) @@ -178,6 +185,108 @@ def quote_table(table) table.quote_table end + # ULID helper methods + def ulid?(value) + return false unless value.is_a?(String) + # Match pure ULIDs or ULIDs with prefixes + value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || + value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) + end + + def numeric_id?(value) + value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) + end + + def id_type(value) + return :numeric if numeric_id?(value) + return :ulid if ulid?(value) + :unknown + end + + # Factory method to get the appropriate ID handler + def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) + if ulid?(sample_id) + UlidHandler.new(connection, table, primary_key) + else + NumericHandler.new + end + end + + class NumericHandler + def min_value + 1 + end + + def predecessor(id) + id - 1 + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + ((max_id - starting_id) / batch_size.to_f).ceil + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + helpers = PgSlice::CLI.instance + operator = inclusive ? ">=" : ">" + "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" + end + + def next_starting_id(starting_id, batch_size) + starting_id + batch_size + end + end + + class UlidHandler + def initialize(connection = nil, table = nil, primary_key = nil) + @connection = connection + @table = table + @primary_key = primary_key + end + + def min_value + PgSlice::Helpers::DEFAULT_ULID + end + + def predecessor(id) + # Use database lookup to find the actual predecessor + return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key + + query = <<~SQL + SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) + FROM #{@table.quote_table} + WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' + SQL + + log_sql query + result = @connection.exec(query) + predecessor_id = result[0]["max"] + predecessor_id || PgSlice::Helpers::DEFAULT_ULID + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + nil # Unknown for ULIDs + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + operator = inclusive ? ">=" : ">" + "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" + end + + def next_starting_id(starting_id, batch_size) + # For ULIDs, we need to get the max ID from the current batch + # This will be handled in the fill logic + nil + end + end + def quote_no_schema(table) quote_ident(table.name) end @@ -205,5 +314,85 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end + + # mirroring triggers + + def enable_mirroring_triggers(table) + intermediate_table = table.intermediate_table + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # create mirror function + queries << <<~SQL + CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() + RETURNS TRIGGER AS $$ + BEGIN + IF TG_OP = 'DELETE' THEN + DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN OLD; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN NEW; + ELSIF TG_OP = 'INSERT' THEN + INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + RETURN NEW; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL + + # create trigger + queries << <<~SQL + CREATE TRIGGER #{quote_ident(trigger_name)} + AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} + FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def disable_mirroring_triggers(table) + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # drop trigger + queries << <<~SQL + DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; + SQL + + # drop function + queries << <<~SQL + DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def mirror_column_list(table) + table.columns.map { |column| quote_ident(column) }.join(", ") + end + + def mirror_new_tuple_list(table) + table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_set_clause(table) + table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_where_clause(table, record) + primary_keys = table.primary_key + if primary_keys && primary_keys.any? + primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") + else + # fallback to all columns if no primary key + table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") + end + end end end diff --git a/lib/pgslice/table.rb b/lib/pgslice/table.rb index 4e2284e..b3f7e7d 100644 --- a/lib/pgslice/table.rb +++ b/lib/pgslice/table.rb @@ -123,7 +123,11 @@ def max_id(primary_key, below: nil, where: nil) end conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - execute(query, params)[0]["max"].to_i + result = execute(query, params)[0]["max"] + return result if result.nil? + + # For ULIDs, return as string; for numeric, convert to int + numeric_id?(result) ? result.to_i : result end def min_id(primary_key, column, cast, starting_time, where) @@ -132,7 +136,23 @@ def min_id(primary_key, column, cast, starting_time, where) conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - (execute(query)[0]["min"] || 1).to_i + result = execute(query)[0]["min"] + + # Return appropriate default and type based on primary key type + if result.nil? + # Check if we're dealing with ULIDs by sampling a row + sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table} LIMIT 1" + sample_result = execute(sample_query)[0] + if sample_result + handler = id_handler(sample_result[primary_key]) + return handler.min_value + else + return 1 # Default numeric when no sample available + end + end + + # Return the actual result with proper type + numeric_id?(result) ? result.to_i : result end # ensure this returns partitions in the correct order @@ -224,5 +244,13 @@ def quote_ident(value) def sql_date(*args) PgSlice::CLI.instance.send(:sql_date, *args) end + + def numeric_id?(value) + PgSlice::CLI.instance.send(:numeric_id?, value) + end + + def id_handler(sample_id) + PgSlice::CLI.instance.send(:id_handler, sample_id) + end end end From a0b027efcb33f6af17f7c708fa226a1f18b8ffda Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 10:01:24 -0600 Subject: [PATCH 02/11] Revert "get all commits from our ext repo" This reverts commit dbddb6549af56209a2f1e245d39fe84f14bf6998. --- lib/pgslice.rb | 1 - lib/pgslice/cli.rb | 22 --- lib/pgslice/cli/fill.rb | 96 +++-------- lib/pgslice/cli/synchronize.rb | 300 --------------------------------- lib/pgslice/helpers.rb | 211 ++--------------------- lib/pgslice/table.rb | 32 +--- 6 files changed, 32 insertions(+), 630 deletions(-) delete mode 100644 lib/pgslice/cli/synchronize.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index 4732176..a8bf0cf 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,4 +21,3 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" -require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli.rb b/lib/pgslice/cli.rb index 29b708a..ff22dc0 100644 --- a/lib/pgslice/cli.rb +++ b/lib/pgslice/cli.rb @@ -26,27 +26,5 @@ def initialize(*args) def version log("pgslice #{PgSlice::VERSION}") end - - desc "enable_mirroring TABLE", "Enable mirroring triggers for live data changes during partitioning" - def enable_mirroring(table_name) - table = create_table(table_name) - intermediate_table = table.intermediate_table - - assert_table(table) - assert_table(intermediate_table) - - enable_mirroring_triggers(table) - log("Mirroring triggers enabled for #{table_name}") - end - - desc "disable_mirroring TABLE", "Disable mirroring triggers after partitioning is complete" - def disable_mirroring(table_name) - table = create_table(table_name) - - assert_table(table) - - disable_mirroring_triggers(table) - log("Mirroring triggers disabled for #{table_name}") - end end end diff --git a/lib/pgslice/cli/fill.rb b/lib/pgslice/cli/fill.rb index a4a85c0..64a3f53 100644 --- a/lib/pgslice/cli/fill.rb +++ b/lib/pgslice/cli/fill.rb @@ -5,7 +5,7 @@ class CLI option :swapped, type: :boolean, default: false, desc: "Use swapped table" option :source_table, desc: "Source table" option :dest_table, desc: "Destination table" - option :start, type: :string, desc: "Primary key to start (numeric or ULID)" + option :start, type: :numeric, desc: "Primary key to start" option :where, desc: "Conditions to filter" option :sleep, type: :numeric, desc: "Seconds to sleep between batches" def fill(table) @@ -45,44 +45,21 @@ def fill(table) begin max_source_id = source_table.max_id(primary_key) rescue PG::UndefinedFunction - abort "Only numeric and ULID primary keys are supported" + abort "Only numeric primary keys are supported" end max_dest_id = if options[:start] - # Convert to appropriate type - start_val = options[:start] - numeric_id?(start_val) ? start_val.to_i : start_val + options[:start] elsif options[:swapped] dest_table.max_id(primary_key, where: options[:where], below: max_source_id) else dest_table.max_id(primary_key, where: options[:where]) end - # Get the appropriate handler for the ID type - # Prefer --start option, then max_source_id, then sample from table - handler = if options[:start] - id_handler(options[:start], connection, source_table, primary_key) - elsif max_source_id - id_handler(max_source_id, connection, source_table, primary_key) - else - # Sample a row to determine ID type - sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(source_table)} LIMIT 1" - log_sql sample_query - sample_result = execute(sample_query)[0] - if sample_result && sample_result[primary_key] - id_handler(sample_result[primary_key], connection, source_table, primary_key) - else - # Default to numeric if we can't determine - Helpers::NumericHandler.new - end - end - - if (max_dest_id == 0 || max_dest_id == handler.min_value) && !options[:swapped] + if max_dest_id == 0 && !options[:swapped] min_source_id = source_table.min_id(primary_key, field, cast, starting_time, options[:where]) - if min_source_id - max_dest_id = handler.predecessor(min_source_id) - end + max_dest_id = min_source_id - 1 if min_source_id end starting_id = max_dest_id @@ -90,15 +67,14 @@ def fill(table) batch_size = options[:batch_size] i = 1 - batch_count = handler.batch_count(starting_id, max_source_id, batch_size) - first_batch = true + batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil if batch_count == 0 log_sql "/* nothing to fill */" end - while handler.should_continue?(starting_id, max_source_id) - where = handler.batch_where_condition(primary_key, starting_id, batch_size, first_batch && options[:start]) + while starting_id < max_source_id + where = "#{quote_ident(primary_key)} > #{quote(starting_id)} AND #{quote_ident(primary_key)} <= #{quote(starting_id + batch_size)}" if starting_time where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end @@ -106,53 +82,19 @@ def fill(table) where << " AND #{options[:where]}" end - batch_label = batch_count ? "#{i} of #{batch_count}" : "batch #{i}" - - if handler.is_a?(UlidHandler) - # For ULIDs, use CTE with RETURNING to get max ID inserted - query = <<~SQL - /* #{batch_label} */ - WITH inserted_batch AS ( - INSERT INTO #{quote_table(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_table(source_table)} - WHERE #{where} - ORDER BY #{quote_ident(primary_key)} - LIMIT #{batch_size} - ON CONFLICT DO NOTHING - RETURNING #{quote_ident(primary_key)} - ) - SELECT MAX(#{quote_ident(primary_key)}) as max_inserted_id FROM inserted_batch - SQL - - log_sql query - result = execute(query) - max_inserted_id = result[0]["max_inserted_id"] - puts "starting_id: #{starting_id}" - puts "max_inserted_id: #{max_inserted_id}" - - # If no records were inserted, break the loop - if max_inserted_id.nil? - break - end - - starting_id = max_inserted_id - else - query = <<~SQL - /* #{batch_label} */ - INSERT INTO #{quote_table(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_table(source_table)} - WHERE #{where} - ON CONFLICT DO NOTHING - SQL - - run_query(query) - starting_id = handler.next_starting_id(starting_id, batch_size) - end - + query = <<~SQL + /* #{i} of #{batch_count} */ + INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} + WHERE #{where} + SQL + + run_query(query) + + starting_id += batch_size i += 1 - first_batch = false - if options[:sleep] && handler.should_continue?(starting_id, max_source_id) + if options[:sleep] && starting_id <= max_source_id sleep(options[:sleep]) end end diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb deleted file mode 100644 index a99db60..0000000 --- a/lib/pgslice/cli/synchronize.rb +++ /dev/null @@ -1,300 +0,0 @@ -module PgSlice - class CLI - desc "synchronize TABLE", "Synchronize data between two tables" - option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" - option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" - option :primary_key, type: :string, desc: "Primary key column name" - option :start, type: :string, desc: "Primary key value to start synchronization at" - option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" - option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" - option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" - option :read_only, type: :boolean, default: false, desc: "Log SQL statements instead of executing them" - def synchronize(table_name) - table = create_table(table_name) - - # Determine source and target tables - source_table = options[:source_table] ? create_table(options[:source_table]) : table - target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table - - # Verify both tables exist - assert_table(source_table) - assert_table(target_table) - - # Get and verify schemas match - source_schema = get_table_schema(source_table) - target_schema = get_table_schema(target_table) - verify_schemas_match(source_table, target_table, source_schema, target_schema) - - # Get primary key - primary_key = options[:primary_key] || source_table.primary_key&.first - abort "Primary key not found. Specify with --primary-key" unless primary_key - abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] - - # Determine starting value - starting_id = options[:start] - unless starting_id - starting_id = get_min_id(source_table, primary_key) - abort "No rows found in source table" unless starting_id - end - - # Get parameters - window_size = options[:window_size] - base_delay = options[:delay] - delay_multiplier = options[:delay_multiplier] - read_only = options[:read_only] - - log "Synchronizing #{source_table} to #{target_table}" - log "Mode: #{read_only ? 'READ-ONLY (logging only)' : 'WRITE (executing changes)'}" - log "Primary key: #{primary_key}" - log "Starting at: #{starting_id}" - log "Window size: #{window_size}" - log "Base delay: #{base_delay}s" - log "Delay multiplier: #{delay_multiplier}" - log - - # Statistics - stats = { - total_rows: 0, - matching_rows: 0, - rows_with_differences: 0, - missing_rows: 0, - extra_rows: 0, - batches: 0 - } - - columns = source_schema.keys - - # Main synchronization loop - first_batch = true - loop do - batch_start_time = Time.now - - # Fetch batch from source - source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) - break if source_rows.empty? - - stats[:batches] += 1 - first_batch = false - stats[:total_rows] += source_rows.size - - # Get primary keys and range from source batch - source_pks = source_rows.map { |row| row[primary_key] } - first_source_pk = source_rows.first[primary_key] - last_source_pk = source_rows.last[primary_key] - - # Fetch corresponding rows from target using range query to catch deletions - target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) - target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } - - # Compare and generate fix queries - fix_queries = [] - - source_rows.each do |source_row| - pk_value = source_row[primary_key] - target_row = target_rows_by_pk[pk_value] - - if target_row.nil? - # Missing row in target - stats[:missing_rows] += 1 - fix_queries << generate_insert(target_table, source_row, columns) - elsif rows_differ?(source_row, target_row, columns) - # Rows differ - stats[:rows_with_differences] += 1 - fix_queries << generate_update(target_table, primary_key, source_row, columns) - else - # Rows match - stats[:matching_rows] += 1 - end - end - - # Check for extra rows in target (rows in target but not in source batch) - # Note: This only checks within the current batch window - extra_pks = target_rows_by_pk.keys - source_pks - extra_pks.each do |pk_value| - stats[:extra_rows] += 1 - fix_queries << generate_delete(target_table, primary_key, pk_value) - end - - # Get first and last primary key for logging - first_pk = source_rows.first[primary_key] - last_pk = source_rows.last[primary_key] - pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" - - # Execute or log fix queries - if fix_queries.any? - log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" - if read_only - log_sql "-- Read-only mode: logging statements (not executing)" - fix_queries.each { |query| log_sql query } - log_sql - else - # In write mode, log truncated SQL and execute without auto-logging - fix_queries.each { |query| log_sql truncate_sql_for_log(query) } - run_queries(fix_queries, silent: true) - end - else - log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" - end - - # Update starting_id for next batch (use > not >=) - starting_id = source_rows.last[primary_key] - - # Calculate adaptive delay: M + N*P - batch_duration = Time.now - batch_start_time - sleep_time = base_delay + (batch_duration * delay_multiplier) - if sleep_time > 0 - log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" - sleep(sleep_time) - end - - # Break if we processed fewer rows than window size (last batch) - break if source_rows.size < window_size - end - - # Print summary - log - log "Synchronization complete" - log "=" * 50 - log "Total batches: #{stats[:batches]}" - log "Total rows compared: #{stats[:total_rows]}" - log "Matching rows: #{stats[:matching_rows]}" - log "Rows with differences: #{stats[:rows_with_differences]}" - log "Missing rows: #{stats[:missing_rows]}" - log "Extra rows: #{stats[:extra_rows]}" - end - - private - - def log_with_timestamp(message) - timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") - log "[#{timestamp}] #{message}" - end - - def get_table_schema(table) - query = <<~SQL - SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale - FROM information_schema.columns - WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' - ORDER BY ordinal_position - SQL - rows = execute(query, [table.schema, table.name]) - rows.each_with_object({}) do |row, hash| - hash[row["column_name"]] = { - data_type: row["data_type"], - character_maximum_length: row["character_maximum_length"], - numeric_precision: row["numeric_precision"], - numeric_scale: row["numeric_scale"] - } - end - end - - def verify_schemas_match(source_table, target_table, source_schema, target_schema) - source_schema.each do |col_name, col_spec| - target_spec = target_schema[col_name] - abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec - - if col_spec[:data_type] != target_spec[:data_type] - abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" - end - end - - target_schema.each do |col_name, _| - abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] - end - end - - def get_min_id(table, primary_key) - query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" - result = execute(query) - result.first&.values&.first - end - - def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) - column_list = columns.map { |c| quote_ident(c) }.join(", ") - # Use >= for first batch to include starting_id, > for subsequent batches - operator = first_batch ? ">=" : ">" - query = <<~SQL - SELECT #{column_list} - FROM #{quote_table(table)} - WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} - ORDER BY #{quote_ident(primary_key)} - LIMIT #{limit.to_i} - SQL - execute(query) - end - - def fetch_rows_by_pks(table, primary_key, pk_values, columns) - return [] if pk_values.empty? - - column_list = columns.map { |c| quote_ident(c) }.join(", ") - # Build IN clause with proper quoting - pk_list = pk_values.map { |pk| quote(pk) }.join(", ") - query = <<~SQL - SELECT #{column_list} - FROM #{quote_table(table)} - WHERE #{quote_ident(primary_key)} IN (#{pk_list}) - SQL - execute(query) - end - - def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) - column_list = columns.map { |c| quote_ident(c) }.join(", ") - query = <<~SQL - SELECT #{column_list} - FROM #{quote_table(table)} - WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} - AND #{quote_ident(primary_key)} <= #{quote(last_pk)} - ORDER BY #{quote_ident(primary_key)} - SQL - execute(query) - end - - def rows_differ?(source_row, target_row, columns) - columns.any? { |col| source_row[col] != target_row[col] } - end - - def generate_insert(table, row, columns) - column_list = columns.map { |c| quote_ident(c) }.join(", ") - value_list = columns.map { |c| quote(row[c]) }.join(", ") - "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" - end - - def generate_update(table, primary_key, row, columns) - set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") - "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" - end - - def generate_delete(table, primary_key, pk_value) - "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" - end - - def truncate_sql_for_log(sql) - # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" - if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i - table_part = $1 - values_part = $2 - preview = values_part[0, 20] - return "#{table_part}... VALUES(#{preview}...[truncated]" - end - - # For UPDATE statements: show "UPDATE table... SET...[truncated]" - if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i - table_part = $1 - set_part = $2 - preview = set_part[0, 20] - return "#{table_part}... SET #{preview}...[truncated]" - end - - # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" - if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i - table_part = $1 - where_part = $2 - preview = where_part[0, 20] - return "#{table_part}... WHERE #{preview}...[truncated]" - end - - # Fallback: just show first 50 chars - sql[0, 50] + "...[truncated]" - end - end -end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index cae872a..ee2b1b3 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,9 +6,6 @@ module Helpers year: "YYYY" } - # ULID epoch start corresponding to 01/01/1970 - DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" - protected # output @@ -64,20 +61,18 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries, silent: false) + def run_queries(queries) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - unless silent - log_sql "BEGIN;" - log_sql - end - run_queries_without_transaction(queries, silent: silent) - log_sql "COMMIT;" unless silent + log_sql "BEGIN;" + log_sql + run_queries_without_transaction(queries) + log_sql "COMMIT;" end end - def run_query(query, silent: false) - log_sql query unless silent + def run_query(query) + log_sql query unless options[:dry_run] begin execute(query) @@ -85,12 +80,12 @@ def run_query(query, silent: false) abort "#{e.class.name}: #{e.message}" end end - log_sql unless silent + log_sql end - def run_queries_without_transaction(queries, silent: false) + def run_queries_without_transaction(queries) queries.each do |query| - run_query(query, silent: silent) + run_query(query) end end @@ -172,9 +167,7 @@ def quote_ident(value) end def quote(value) - if value.nil? - "NULL" - elsif value.is_a?(Numeric) + if value.is_a?(Numeric) value else connection.escape_literal(value) @@ -185,108 +178,6 @@ def quote_table(table) table.quote_table end - # ULID helper methods - def ulid?(value) - return false unless value.is_a?(String) - # Match pure ULIDs or ULIDs with prefixes - value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || - value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) - end - - def numeric_id?(value) - value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) - end - - def id_type(value) - return :numeric if numeric_id?(value) - return :ulid if ulid?(value) - :unknown - end - - # Factory method to get the appropriate ID handler - def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) - if ulid?(sample_id) - UlidHandler.new(connection, table, primary_key) - else - NumericHandler.new - end - end - - class NumericHandler - def min_value - 1 - end - - def predecessor(id) - id - 1 - end - - def should_continue?(current_id, max_id) - current_id < max_id - end - - def batch_count(starting_id, max_id, batch_size) - ((max_id - starting_id) / batch_size.to_f).ceil - end - - def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) - helpers = PgSlice::CLI.instance - operator = inclusive ? ">=" : ">" - "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" - end - - def next_starting_id(starting_id, batch_size) - starting_id + batch_size - end - end - - class UlidHandler - def initialize(connection = nil, table = nil, primary_key = nil) - @connection = connection - @table = table - @primary_key = primary_key - end - - def min_value - PgSlice::Helpers::DEFAULT_ULID - end - - def predecessor(id) - # Use database lookup to find the actual predecessor - return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key - - query = <<~SQL - SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) - FROM #{@table.quote_table} - WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' - SQL - - log_sql query - result = @connection.exec(query) - predecessor_id = result[0]["max"] - predecessor_id || PgSlice::Helpers::DEFAULT_ULID - end - - def should_continue?(current_id, max_id) - current_id < max_id - end - - def batch_count(starting_id, max_id, batch_size) - nil # Unknown for ULIDs - end - - def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) - operator = inclusive ? ">=" : ">" - "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" - end - - def next_starting_id(starting_id, batch_size) - # For ULIDs, we need to get the max ID from the current batch - # This will be handled in the fill logic - nil - end - end - def quote_no_schema(table) quote_ident(table.name) end @@ -314,85 +205,5 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end - - # mirroring triggers - - def enable_mirroring_triggers(table) - intermediate_table = table.intermediate_table - function_name = "#{table.name}_mirror_to_intermediate" - trigger_name = "#{table.name}_mirror_trigger" - - queries = [] - - # create mirror function - queries << <<~SQL - CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() - RETURNS TRIGGER AS $$ - BEGIN - IF TG_OP = 'DELETE' THEN - DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; - RETURN OLD; - ELSIF TG_OP = 'UPDATE' THEN - UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; - RETURN NEW; - ELSIF TG_OP = 'INSERT' THEN - INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); - RETURN NEW; - END IF; - RETURN NULL; - END; - $$ LANGUAGE plpgsql; - SQL - - # create trigger - queries << <<~SQL - CREATE TRIGGER #{quote_ident(trigger_name)} - AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} - FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); - SQL - - run_queries(queries) - end - - def disable_mirroring_triggers(table) - function_name = "#{table.name}_mirror_to_intermediate" - trigger_name = "#{table.name}_mirror_trigger" - - queries = [] - - # drop trigger - queries << <<~SQL - DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; - SQL - - # drop function - queries << <<~SQL - DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); - SQL - - run_queries(queries) - end - - def mirror_column_list(table) - table.columns.map { |column| quote_ident(column) }.join(", ") - end - - def mirror_new_tuple_list(table) - table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") - end - - def mirror_set_clause(table) - table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") - end - - def mirror_where_clause(table, record) - primary_keys = table.primary_key - if primary_keys && primary_keys.any? - primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") - else - # fallback to all columns if no primary key - table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") - end - end end end diff --git a/lib/pgslice/table.rb b/lib/pgslice/table.rb index b3f7e7d..4e2284e 100644 --- a/lib/pgslice/table.rb +++ b/lib/pgslice/table.rb @@ -123,11 +123,7 @@ def max_id(primary_key, below: nil, where: nil) end conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - result = execute(query, params)[0]["max"] - return result if result.nil? - - # For ULIDs, return as string; for numeric, convert to int - numeric_id?(result) ? result.to_i : result + execute(query, params)[0]["max"].to_i end def min_id(primary_key, column, cast, starting_time, where) @@ -136,23 +132,7 @@ def min_id(primary_key, column, cast, starting_time, where) conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - result = execute(query)[0]["min"] - - # Return appropriate default and type based on primary key type - if result.nil? - # Check if we're dealing with ULIDs by sampling a row - sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table} LIMIT 1" - sample_result = execute(sample_query)[0] - if sample_result - handler = id_handler(sample_result[primary_key]) - return handler.min_value - else - return 1 # Default numeric when no sample available - end - end - - # Return the actual result with proper type - numeric_id?(result) ? result.to_i : result + (execute(query)[0]["min"] || 1).to_i end # ensure this returns partitions in the correct order @@ -244,13 +224,5 @@ def quote_ident(value) def sql_date(*args) PgSlice::CLI.instance.send(:sql_date, *args) end - - def numeric_id?(value) - PgSlice::CLI.instance.send(:numeric_id?, value) - end - - def id_handler(sample_id) - PgSlice::CLI.instance.send(:id_handler, sample_id) - end end end From 0194f40d9c94decf296b879ae42447ea206d6c64 Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 10:22:09 -0600 Subject: [PATCH 03/11] sync command --- lib/pgslice.rb | 1 + lib/pgslice/cli/synchronize.rb | 299 +++++++++++++++++++++++++++++++++ lib/pgslice/helpers.rb | 211 +++++++++++++++++++++-- 3 files changed, 500 insertions(+), 11 deletions(-) create mode 100644 lib/pgslice/cli/synchronize.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index a8bf0cf..4732176 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,3 +21,4 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" +require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb new file mode 100644 index 0000000..d650ef9 --- /dev/null +++ b/lib/pgslice/cli/synchronize.rb @@ -0,0 +1,299 @@ +module PgSlice + class CLI + desc "synchronize TABLE", "Synchronize data between two tables" + option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" + option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" + option :primary_key, type: :string, desc: "Primary key column name" + option :start, type: :string, desc: "Primary key value to start synchronization at" + option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" + option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" + option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" + def synchronize(table_name) + table = create_table(table_name) + + # Determine source and target tables + source_table = options[:source_table] ? create_table(options[:source_table]) : table + target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table + + # Verify both tables exist + assert_table(source_table) + assert_table(target_table) + + # Get and verify schemas match + source_schema = get_table_schema(source_table) + target_schema = get_table_schema(target_table) + verify_schemas_match(source_table, target_table, source_schema, target_schema) + + # Get primary key + primary_key = options[:primary_key] || source_table.primary_key&.first + abort "Primary key not found. Specify with --primary-key" unless primary_key + abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] + + # Determine starting value + starting_id = options[:start] + unless starting_id + starting_id = get_min_id(source_table, primary_key) + abort "No rows found in source table" unless starting_id + end + + # Get parameters + window_size = options[:window_size] + base_delay = options[:delay] + delay_multiplier = options[:delay_multiplier] + dry_run = options[:dry_run] + + log "Synchronizing #{source_table} to #{target_table}" + log "Mode: #{dry_run ? 'DRY RUN (logging only)' : 'WRITE (executing changes)'}" + log "Primary key: #{primary_key}" + log "Starting at: #{starting_id}" + log "Window size: #{window_size}" + log "Base delay: #{base_delay}s" + log "Delay multiplier: #{delay_multiplier}" + log + + # Statistics + stats = { + total_rows: 0, + matching_rows: 0, + rows_with_differences: 0, + missing_rows: 0, + extra_rows: 0, + batches: 0 + } + + columns = source_schema.keys + + # Main synchronization loop + first_batch = true + loop do + batch_start_time = Time.now + + # Fetch batch from source + source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) + break if source_rows.empty? + + stats[:batches] += 1 + first_batch = false + stats[:total_rows] += source_rows.size + + # Get primary keys and range from source batch + source_pks = source_rows.map { |row| row[primary_key] } + first_source_pk = source_rows.first[primary_key] + last_source_pk = source_rows.last[primary_key] + + # Fetch corresponding rows from target using range query to catch deletions + target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) + target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } + + # Compare and generate fix queries + fix_queries = [] + + source_rows.each do |source_row| + pk_value = source_row[primary_key] + target_row = target_rows_by_pk[pk_value] + + if target_row.nil? + # Missing row in target + stats[:missing_rows] += 1 + fix_queries << generate_insert(target_table, source_row, columns) + elsif rows_differ?(source_row, target_row, columns) + # Rows differ + stats[:rows_with_differences] += 1 + fix_queries << generate_update(target_table, primary_key, source_row, columns) + else + # Rows match + stats[:matching_rows] += 1 + end + end + + # Check for extra rows in target (rows in target but not in source batch) + # Note: This only checks within the current batch window + extra_pks = target_rows_by_pk.keys - source_pks + extra_pks.each do |pk_value| + stats[:extra_rows] += 1 + fix_queries << generate_delete(target_table, primary_key, pk_value) + end + + # Get first and last primary key for logging + first_pk = source_rows.first[primary_key] + last_pk = source_rows.last[primary_key] + pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" + + # Execute or log fix queries + if fix_queries.any? + log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" + if dry_run + log_sql "-- Dry run mode: logging statements (not executing)" + fix_queries.each { |query| log_sql query } + log_sql + else + # In write mode, log truncated SQL and execute without auto-logging + fix_queries.each { |query| log_sql truncate_sql_for_log(query) } + run_queries(fix_queries, silent: true) + end + else + log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" + end + + # Update starting_id for next batch (use > not >=) + starting_id = source_rows.last[primary_key] + + # Calculate adaptive delay: M + N*P + batch_duration = Time.now - batch_start_time + sleep_time = base_delay + (batch_duration * delay_multiplier) + if sleep_time > 0 + log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" + sleep(sleep_time) + end + + # Break if we processed fewer rows than window size (last batch) + break if source_rows.size < window_size + end + + # Print summary + log + log "Synchronization complete" + log "=" * 50 + log "Total batches: #{stats[:batches]}" + log "Total rows compared: #{stats[:total_rows]}" + log "Matching rows: #{stats[:matching_rows]}" + log "Rows with differences: #{stats[:rows_with_differences]}" + log "Missing rows: #{stats[:missing_rows]}" + log "Extra rows: #{stats[:extra_rows]}" + end + + private + + def log_with_timestamp(message) + timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") + log "[#{timestamp}] #{message}" + end + + def get_table_schema(table) + query = <<~SQL + SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' + ORDER BY ordinal_position + SQL + rows = execute(query, [table.schema, table.name]) + rows.each_with_object({}) do |row, hash| + hash[row["column_name"]] = { + data_type: row["data_type"], + character_maximum_length: row["character_maximum_length"], + numeric_precision: row["numeric_precision"], + numeric_scale: row["numeric_scale"] + } + end + end + + def verify_schemas_match(source_table, target_table, source_schema, target_schema) + source_schema.each do |col_name, col_spec| + target_spec = target_schema[col_name] + abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec + + if col_spec[:data_type] != target_spec[:data_type] + abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" + end + end + + target_schema.each do |col_name, _| + abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] + end + end + + def get_min_id(table, primary_key) + query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" + result = execute(query) + result.first&.values&.first + end + + def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Use >= for first batch to include starting_id, > for subsequent batches + operator = first_batch ? ">=" : ">" + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{limit.to_i} + SQL + execute(query) + end + + def fetch_rows_by_pks(table, primary_key, pk_values, columns) + return [] if pk_values.empty? + + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Build IN clause with proper quoting + pk_list = pk_values.map { |pk| quote(pk) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} IN (#{pk_list}) + SQL + execute(query) + end + + def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} + AND #{quote_ident(primary_key)} <= #{quote(last_pk)} + ORDER BY #{quote_ident(primary_key)} + SQL + execute(query) + end + + def rows_differ?(source_row, target_row, columns) + columns.any? { |col| source_row[col] != target_row[col] } + end + + def generate_insert(table, row, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + value_list = columns.map { |c| quote(row[c]) }.join(", ") + "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" + end + + def generate_update(table, primary_key, row, columns) + set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") + "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" + end + + def generate_delete(table, primary_key, pk_value) + "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" + end + + def truncate_sql_for_log(sql) + # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" + if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i + table_part = $1 + values_part = $2 + preview = values_part[0, 20] + return "#{table_part}... VALUES(#{preview}...[truncated]" + end + + # For UPDATE statements: show "UPDATE table... SET...[truncated]" + if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i + table_part = $1 + set_part = $2 + preview = set_part[0, 20] + return "#{table_part}... SET #{preview}...[truncated]" + end + + # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" + if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i + table_part = $1 + where_part = $2 + preview = where_part[0, 20] + return "#{table_part}... WHERE #{preview}...[truncated]" + end + + # Fallback: just show first 50 chars + sql[0, 50] + "...[truncated]" + end + end +end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index ee2b1b3..0c90bdd 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,6 +6,9 @@ module Helpers year: "YYYY" } + # ULID epoch start corresponding to 01/01/1970 + DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" + protected # output @@ -61,18 +64,20 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries) + def run_queries(queries, silent: false) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - log_sql "BEGIN;" - log_sql - run_queries_without_transaction(queries) - log_sql "COMMIT;" + unless silent + log_sql "BEGIN;" + log_sql + end + run_queries_without_transaction(queries, silent: silent) + log_sql "COMMIT;" unless silent end end - def run_query(query) - log_sql query + def run_query(query, silent: false) + log_sql query unless silent unless options[:dry_run] begin execute(query) @@ -80,12 +85,12 @@ def run_query(query) abort "#{e.class.name}: #{e.message}" end end - log_sql + log_sql unless silent end - def run_queries_without_transaction(queries) + def run_queries_without_transaction(queries, silent: false) queries.each do |query| - run_query(query) + run_query(query, silent: silent) end end @@ -167,7 +172,9 @@ def quote_ident(value) end def quote(value) - if value.is_a?(Numeric) + if value.nil? + "NULL" + elsif value.is_a?(Numeric) value else connection.escape_literal(value) @@ -178,6 +185,108 @@ def quote_table(table) table.quote_table end + # ULID helper methods + def ulid?(value) + return false unless value.is_a?(String) + # Match pure ULIDs or ULIDs with prefixes + value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || + value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) + end + + def numeric_id?(value) + value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) + end + + def id_type(value) + return :numeric if numeric_id?(value) + return :ulid if ulid?(value) + :unknown + end + + # Factory method to get the appropriate ID handler + def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) + if ulid?(sample_id) + UlidHandler.new(connection, table, primary_key) + else + NumericHandler.new + end + end + + class NumericHandler + def min_value + 1 + end + + def predecessor(id) + id - 1 + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + ((max_id - starting_id) / batch_size.to_f).ceil + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + helpers = PgSlice::CLI.instance + operator = inclusive ? ">=" : ">" + "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" + end + + def next_starting_id(starting_id, batch_size) + starting_id + batch_size + end + end + + class UlidHandler + def initialize(connection = nil, table = nil, primary_key = nil) + @connection = connection + @table = table + @primary_key = primary_key + end + + def min_value + PgSlice::Helpers::DEFAULT_ULID + end + + def predecessor(id) + # Use database lookup to find the actual predecessor + return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key + + query = <<~SQL + SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) + FROM #{@table.quote_table} + WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' + SQL + + log_sql query + result = @connection.exec(query) + predecessor_id = result[0]["max"] + predecessor_id || PgSlice::Helpers::DEFAULT_ULID + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + nil # Unknown for ULIDs + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + operator = inclusive ? ">=" : ">" + "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" + end + + def next_starting_id(starting_id, batch_size) + # For ULIDs, we need to get the max ID from the current batch + # This will be handled in the fill logic + nil + end + end + def quote_no_schema(table) quote_ident(table.name) end @@ -205,5 +314,85 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end + + # mirroring triggers + + def enable_mirroring_triggers(table) + intermediate_table = table.intermediate_table + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # create mirror function + queries << <<~SQL + CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() + RETURNS TRIGGER AS $$ + BEGIN + IF TG_OP = 'DELETE' THEN + DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN OLD; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN NEW; + ELSIF TG_OP = 'INSERT' THEN + INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + RETURN NEW; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL + + # create trigger + queries << <<~SQL + CREATE TRIGGER #{quote_ident(trigger_name)} + AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} + FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def disable_mirroring_triggers(table) + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # drop trigger + queries << <<~SQL + DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; + SQL + + # drop function + queries << <<~SQL + DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def mirror_column_list(table) + table.columns.map { |column| quote_ident(column) }.join(", ") + end + + def mirror_new_tuple_list(table) + table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_set_clause(table) + table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_where_clause(table, record) + primary_keys = table.primary_key + if primary_keys && primary_keys.any? + primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") + else + # fallback to all columns if no primary key + table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") + end + end end end From ade09c5eb3155c673dc5bba5ce7f10a9117840fa Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 10:42:46 -0600 Subject: [PATCH 04/11] README update too --- README.md | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5ab4c06..ce5a51d 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,17 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker). pgslice analyze ``` -7. Swap the intermediate table with the original table +7. Sync/Validate the tables + +This will ensure the two tables are definitely in sync. It should be a no-op, but will generate +INSERT, UPDATE, and DELETE statements if discrepencies are discovered. On a production system, +ensure you understand the `--window-size`, `--delay`, and `--delay-multiplier options`. + +```sh +pgslice synchronize
[options] +``` + +8. Swap the intermediate table with the original table ```sh pgslice swap
@@ -70,13 +80,15 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker). The original table is renamed `
_retired` and the intermediate table is renamed `
`. -8. Fill the rest (rows inserted between the first fill and the swap) +9. Fill the rest (rows inserted between the first fill and the swap) + +This step should not be needed if you did the pgslice synchronize in step 7. ```sh pgslice fill
--swapped ``` -9. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it +10. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it ```sql pg_dump -c -Fc -t
_retired $PGSLICE_URL >
_retired.dump From d193e600de925534d0c31a8552ccc5a501d2c349 Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 11:36:55 -0600 Subject: [PATCH 05/11] retired mirroring --- lib/pgslice.rb | 2 + lib/pgslice/cli/disable_retired_mirroring.rb | 13 +++++ lib/pgslice/cli/enable_retired_mirroring.rb | 15 +++++ lib/pgslice/helpers.rb | 58 ++++++++++++++++++++ 4 files changed, 88 insertions(+) create mode 100644 lib/pgslice/cli/disable_retired_mirroring.rb create mode 100644 lib/pgslice/cli/enable_retired_mirroring.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index 4732176..faff252 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -22,3 +22,5 @@ require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" require_relative "pgslice/cli/synchronize" +require_relative "pgslice/cli/enable_retired_mirroring" +require_relative "pgslice/cli/disable_retired_mirroring" diff --git a/lib/pgslice/cli/disable_retired_mirroring.rb b/lib/pgslice/cli/disable_retired_mirroring.rb new file mode 100644 index 0000000..112a631 --- /dev/null +++ b/lib/pgslice/cli/disable_retired_mirroring.rb @@ -0,0 +1,13 @@ +module PgSlice + class CLI + desc "disable_retired_mirroring TABLE", "Disable mirroring triggers from TABLE to TABLE_retired" + def disable_retired_mirroring(table_name) + table = create_table(table_name) + + assert_table(table) + + disable_retired_mirroring_triggers(table) + log("Retired mirroring triggers disabled for #{table_name}") + end + end +end diff --git a/lib/pgslice/cli/enable_retired_mirroring.rb b/lib/pgslice/cli/enable_retired_mirroring.rb new file mode 100644 index 0000000..a6ed602 --- /dev/null +++ b/lib/pgslice/cli/enable_retired_mirroring.rb @@ -0,0 +1,15 @@ +module PgSlice + class CLI + desc "enable_retired_mirroring TABLE", "Enable mirroring triggers from TABLE to TABLE_retired" + def enable_retired_mirroring(table_name) + table = create_table(table_name) + retired_table = table.retired_table + + assert_table(table) + assert_table(retired_table) + + enable_retired_mirroring_triggers(table) + log("Retired mirroring triggers enabled for #{table_name}") + end + end +end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index 0c90bdd..732f9ec 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -394,5 +394,63 @@ def mirror_where_clause(table, record) table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") end end + + # retired mirroring triggers + + def enable_retired_mirroring_triggers(table) + retired_table = table.retired_table + function_name = "#{table.name}_mirror_to_retired" + trigger_name = "#{table.name}_retired_mirror_trigger" + + queries = [] + + # create mirror function + queries << <<~SQL + CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() + RETURNS TRIGGER AS $$ + BEGIN + IF TG_OP = 'DELETE' THEN + DELETE FROM #{quote_table(retired_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN OLD; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE #{quote_table(retired_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN NEW; + ELSIF TG_OP = 'INSERT' THEN + INSERT INTO #{quote_table(retired_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + RETURN NEW; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL + + # create trigger + queries << <<~SQL + CREATE TRIGGER #{quote_ident(trigger_name)} + AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} + FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def disable_retired_mirroring_triggers(table) + function_name = "#{table.name}_mirror_to_retired" + trigger_name = "#{table.name}_retired_mirror_trigger" + + queries = [] + + # drop trigger + queries << <<~SQL + DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; + SQL + + # drop function + queries << <<~SQL + DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end end end From ff3c8162df2d193be575fbc91f75940171fec644 Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 11:40:35 -0600 Subject: [PATCH 06/11] Docs for retired table mirroring --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index ce5a51d..b4a62cf 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,16 @@ pgslice synchronize
[options] The original table is renamed `
_retired` and the intermediate table is renamed `
`. +9. Enable Reverse Mirroring (now-partitioned table to retired table) + +This will make unswapping later less problematic as the two tables are kept in sync. Note that +the tables will be slightly out of sync. Find some ID from before the swap, and run the table +synchronize commands from Step 7 on the table to be sure to catch those rows. + +```sh +pgslice enable_retired_mirroring
# undo with pgslice disable_retired_mirroring
+``` + 9. Fill the rest (rows inserted between the first fill and the swap) This step should not be needed if you did the pgslice synchronize in step 7. From 8144adae36779fee907e1c5a6fd3bc978d9a50dd Mon Sep 17 00:00:00 2001 From: timeless Date: Fri, 5 Dec 2025 13:13:19 -0600 Subject: [PATCH 07/11] dont need ulid helper yet --- lib/pgslice/helpers.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index 732f9ec..5eb6276 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -185,14 +185,6 @@ def quote_table(table) table.quote_table end - # ULID helper methods - def ulid?(value) - return false unless value.is_a?(String) - # Match pure ULIDs or ULIDs with prefixes - value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || - value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) - end - def numeric_id?(value) value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) end From 88ccf050cb79c0d7d163cf26be70d8c1715833cc Mon Sep 17 00:00:00 2001 From: timeless Date: Fri, 5 Dec 2025 13:15:56 -0600 Subject: [PATCH 08/11] not needed for sync --- lib/pgslice/helpers.rb | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index 5eb6276..d9bf187 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -189,12 +189,6 @@ def numeric_id?(value) value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) end - def id_type(value) - return :numeric if numeric_id?(value) - return :ulid if ulid?(value) - :unknown - end - # Factory method to get the appropriate ID handler def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) if ulid?(sample_id) From 79cb8c059b8a7e27b382e0dad86f42e44c081f42 Mon Sep 17 00:00:00 2001 From: timeless Date: Fri, 5 Dec 2025 13:17:58 -0600 Subject: [PATCH 09/11] none of this needed for sync --- lib/pgslice/helpers.rb | 91 ------------------------------------------ 1 file changed, 91 deletions(-) diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index d9bf187..cffd8bb 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,9 +6,6 @@ module Helpers year: "YYYY" } - # ULID epoch start corresponding to 01/01/1970 - DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" - protected # output @@ -185,94 +182,6 @@ def quote_table(table) table.quote_table end - def numeric_id?(value) - value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) - end - - # Factory method to get the appropriate ID handler - def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) - if ulid?(sample_id) - UlidHandler.new(connection, table, primary_key) - else - NumericHandler.new - end - end - - class NumericHandler - def min_value - 1 - end - - def predecessor(id) - id - 1 - end - - def should_continue?(current_id, max_id) - current_id < max_id - end - - def batch_count(starting_id, max_id, batch_size) - ((max_id - starting_id) / batch_size.to_f).ceil - end - - def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) - helpers = PgSlice::CLI.instance - operator = inclusive ? ">=" : ">" - "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" - end - - def next_starting_id(starting_id, batch_size) - starting_id + batch_size - end - end - - class UlidHandler - def initialize(connection = nil, table = nil, primary_key = nil) - @connection = connection - @table = table - @primary_key = primary_key - end - - def min_value - PgSlice::Helpers::DEFAULT_ULID - end - - def predecessor(id) - # Use database lookup to find the actual predecessor - return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key - - query = <<~SQL - SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) - FROM #{@table.quote_table} - WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' - SQL - - log_sql query - result = @connection.exec(query) - predecessor_id = result[0]["max"] - predecessor_id || PgSlice::Helpers::DEFAULT_ULID - end - - def should_continue?(current_id, max_id) - current_id < max_id - end - - def batch_count(starting_id, max_id, batch_size) - nil # Unknown for ULIDs - end - - def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) - operator = inclusive ? ">=" : ">" - "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" - end - - def next_starting_id(starting_id, batch_size) - # For ULIDs, we need to get the max ID from the current batch - # This will be handled in the fill logic - nil - end - end - def quote_no_schema(table) quote_ident(table.name) end From 1c8857f9f6a2cf04a25bb721906ff1e42f82887d Mon Sep 17 00:00:00 2001 From: timeless Date: Fri, 5 Dec 2025 13:23:14 -0600 Subject: [PATCH 10/11] intermediate mirroring oops --- lib/pgslice/helpers.rb | 90 ++++++++---------------------------------- 1 file changed, 16 insertions(+), 74 deletions(-) diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index cffd8bb..7c6e41c 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -210,12 +210,12 @@ def make_stat_def(stat_def, table) stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end - # mirroring triggers + # retired mirroring triggers - def enable_mirroring_triggers(table) - intermediate_table = table.intermediate_table - function_name = "#{table.name}_mirror_to_intermediate" - trigger_name = "#{table.name}_mirror_trigger" + def enable_retired_mirroring_triggers(table) + retired_table = table.retired_table + function_name = "#{table.name}_mirror_to_retired" + trigger_name = "#{table.name}_retired_mirror_trigger" queries = [] @@ -225,13 +225,13 @@ def enable_mirroring_triggers(table) RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'DELETE' THEN - DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + DELETE FROM #{quote_table(retired_table)} WHERE #{mirror_where_clause(table, 'OLD')}; RETURN OLD; ELSIF TG_OP = 'UPDATE' THEN - UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + UPDATE #{quote_table(retired_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; RETURN NEW; ELSIF TG_OP = 'INSERT' THEN - INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + INSERT INTO #{quote_table(retired_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); RETURN NEW; END IF; RETURN NULL; @@ -249,37 +249,6 @@ def enable_mirroring_triggers(table) run_queries(queries) end - def disable_mirroring_triggers(table) - function_name = "#{table.name}_mirror_to_intermediate" - trigger_name = "#{table.name}_mirror_trigger" - - queries = [] - - # drop trigger - queries << <<~SQL - DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; - SQL - - # drop function - queries << <<~SQL - DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); - SQL - - run_queries(queries) - end - - def mirror_column_list(table) - table.columns.map { |column| quote_ident(column) }.join(", ") - end - - def mirror_new_tuple_list(table) - table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") - end - - def mirror_set_clause(table) - table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") - end - def mirror_where_clause(table, record) primary_keys = table.primary_key if primary_keys && primary_keys.any? @@ -290,43 +259,16 @@ def mirror_where_clause(table, record) end end - # retired mirroring triggers - - def enable_retired_mirroring_triggers(table) - retired_table = table.retired_table - function_name = "#{table.name}_mirror_to_retired" - trigger_name = "#{table.name}_retired_mirror_trigger" - - queries = [] - - # create mirror function - queries << <<~SQL - CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() - RETURNS TRIGGER AS $$ - BEGIN - IF TG_OP = 'DELETE' THEN - DELETE FROM #{quote_table(retired_table)} WHERE #{mirror_where_clause(table, 'OLD')}; - RETURN OLD; - ELSIF TG_OP = 'UPDATE' THEN - UPDATE #{quote_table(retired_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; - RETURN NEW; - ELSIF TG_OP = 'INSERT' THEN - INSERT INTO #{quote_table(retired_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); - RETURN NEW; - END IF; - RETURN NULL; - END; - $$ LANGUAGE plpgsql; - SQL + def mirror_set_clause(table) + table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") + end - # create trigger - queries << <<~SQL - CREATE TRIGGER #{quote_ident(trigger_name)} - AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} - FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); - SQL + def mirror_column_list(table) + table.columns.map { |column| quote_ident(column) }.join(", ") + end - run_queries(queries) + def mirror_new_tuple_list(table) + table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") end def disable_retired_mirroring_triggers(table) From a8ee674c3c715ae1c14232df4f3924232b634a81 Mon Sep 17 00:00:00 2001 From: timeless Date: Fri, 5 Dec 2025 13:38:19 -0600 Subject: [PATCH 11/11] tests for new commands --- test/pgslice_test.rb | 62 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/test/pgslice_test.rb b/test/pgslice_test.rb index cf8c062..e9ded63 100644 --- a/test/pgslice_test.rb +++ b/test/pgslice_test.rb @@ -144,6 +144,64 @@ def test_unprep_missing_table assert_error "Table not found", "unprep Items" end + def test_synchronize_missing_table + assert_error "Table not found", "synchronize Items" + end + + def test_synchronize + run_command "prep Posts --no-partition" + assert table_exists?("Posts_intermediate") + + run_command "fill Posts" + assert_equal 10000, count("Posts_intermediate") + + # Modify a row in source to create a difference (use a nullable column) + execute %!UPDATE "Posts" SET "createdAt" = '2020-01-01' WHERE "Id" = 1! + + # Run synchronize + run_command "synchronize Posts --window-size 1000", allow_stderr: true + + # Verify the difference was fixed + result = execute(%!SELECT "createdAt" FROM "Posts_intermediate" WHERE "Id" = 1!) + assert_equal "2020-01-01 00:00:00", result.first["createdAt"] + + run_command "unprep Posts" + end + + def test_enable_retired_mirroring_missing_table + assert_error "Table not found", "enable_retired_mirroring Items" + end + + def test_enable_retired_mirroring + run_command "prep Posts --no-partition" + run_command "fill Posts" + run_command "swap Posts" + assert table_exists?("Posts_retired") + + run_command "enable_retired_mirroring Posts", allow_stderr: true + + run_command "disable_retired_mirroring Posts", allow_stderr: true + run_command "unswap Posts" + run_command "unprep Posts" + end + + def test_disable_retired_mirroring_missing_table + assert_error "Table not found", "disable_retired_mirroring Items" + end + + def test_disable_retired_mirroring + run_command "prep Posts --no-partition" + run_command "fill Posts" + run_command "swap Posts" + assert table_exists?("Posts_retired") + + run_command "enable_retired_mirroring Posts", allow_stderr: true + run_command "disable_retired_mirroring Posts", allow_stderr: true + + run_command "unswap Posts" + run_command "unprep Posts" + end + private def assert_period(period, column: "createdAt", trigger_based: false, tablespace: false, version: nil) @@ -277,7 +335,7 @@ def assert_error(message, command) run_command command, error: message end - def run_command(command, error: nil) + def run_command(command, error: nil, allow_stderr: false) if verbose? puts "$ pgslice #{command}" puts @@ -291,7 +349,7 @@ def run_command(command, error: nil) end if error assert_match error, stderr - else + elsif !allow_stderr assert_equal "", stderr end stdout